Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: send priming requests on the channel directly #2435

Merged
merged 15 commits into from
Dec 3, 2024
1 change: 0 additions & 1 deletion google-cloud-bigtable/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,6 @@
grpc-auth is not directly used transitively, but is pulled to align with other grpc parts
opencensus-impl-core is brought in transitively through opencensus-impl
-->
<usedDependencies>io.grpc:grpc-auth</usedDependencies>
<ignoredUsedUndeclaredDependencies>
<ignoredUsedUndeclaredDependency>io.opencensus:opencensus-impl-core</ignoredUsedUndeclaredDependency>
</ignoredUsedUndeclaredDependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,27 @@
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.BetaApi;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.grpc.ChannelPrimer;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.auth.Credentials;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.InstanceName;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
import com.google.common.base.Preconditions;
import com.google.bigtable.v2.PingAndWarmResponse;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Deadline;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
Expand All @@ -41,72 +50,113 @@
class BigtableChannelPrimer implements ChannelPrimer {
private static Logger LOG = Logger.getLogger(BigtableChannelPrimer.class.toString());

private final EnhancedBigtableStubSettings settingsTemplate;
static final Metadata.Key<String> REQUEST_PARAMS =
Metadata.Key.of("x-goog-request-params", Metadata.ASCII_STRING_MARSHALLER);
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
private final PingAndWarmRequest request;
private final CallCredentials callCredentials;
private final Map<String, String> headers;

static BigtableChannelPrimer create(
Credentials credentials, String projectId, String instanceId, String appProfileId) {
EnhancedBigtableStubSettings.Builder builder =
EnhancedBigtableStubSettings.newBuilder()
.setProjectId(projectId)
.setInstanceId(instanceId)
.setAppProfileId(appProfileId)
.setCredentialsProvider(FixedCredentialsProvider.create(credentials))
// Disable refreshing channel here to avoid creating settings in a loop
.setRefreshingChannel(false)
.setExecutorProvider(
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build());

return new BigtableChannelPrimer(builder.build());
String projectId,
String instanceId,
String appProfileId,
Credentials credentials,
Map<String, String> headers) {
return new BigtableChannelPrimer(projectId, instanceId, appProfileId, credentials, headers);
}

private BigtableChannelPrimer(EnhancedBigtableStubSettings settingsTemplate) {
Preconditions.checkNotNull(settingsTemplate, "settingsTemplate can't be null");
this.settingsTemplate = settingsTemplate;
BigtableChannelPrimer(
String projectId,
String instanceId,
String appProfileId,
Credentials credentials,
Map<String, String> headers) {
if (credentials != null) {
callCredentials = MoreCallCredentials.from(credentials);
} else {
callCredentials = null;
}

request =
PingAndWarmRequest.newBuilder()
.setName(InstanceName.format(projectId, instanceId))
.setAppProfileId(appProfileId)
.build();

this.headers = headers;
}

@Override
public void primeChannel(ManagedChannel managedChannel) {
try {
primeChannelUnsafe(managedChannel);
} catch (IOException | RuntimeException e) {
LOG.warning(
String.format("Unexpected error while trying to prime a channel: %s", e.getMessage()));
LOG.log(Level.WARNING, "Unexpected error while trying to prime a channel", e);
}
}

private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOException {
sendPrimeRequests(managedChannel);
}

private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException {
// Wrap the channel in a temporary stub
EnhancedBigtableStubSettings primingSettings =
settingsTemplate
.toBuilder()
.setTransportChannelProvider(
FixedTransportChannelProvider.create(GrpcTransportChannel.create(managedChannel)))
.build();
private void sendPrimeRequests(ManagedChannel managedChannel) {
try {
ClientCall<PingAndWarmRequest, PingAndWarmResponse> clientCall =
managedChannel.newCall(
BigtableGrpc.getPingAndWarmMethod(),
CallOptions.DEFAULT
.withCallCredentials(callCredentials)
.withDeadline(Deadline.after(1, TimeUnit.MINUTES)));

try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(primingSettings)) {
PingAndWarmRequest request =
PingAndWarmRequest.newBuilder()
.setName(
NameUtil.formatInstanceName(
primingSettings.getProjectId(), primingSettings.getInstanceId()))
.setAppProfileId(primingSettings.getAppProfileId())
.build();

try {
stub.pingAndWarmCallable().call(request);
} catch (Throwable e) {
// TODO: Not sure if we should swallow the error here. We are pre-emptively swapping
// channels if the new
// channel is bad.
if (e instanceof ExecutionException) {
e = e.getCause();
}
LOG.warning(String.format("Failed to prime channel: %s", e));
}
SettableApiFuture<PingAndWarmResponse> future = SettableApiFuture.create();
clientCall.start(
new ClientCall.Listener<PingAndWarmResponse>() {
PingAndWarmResponse response;

@Override
public void onMessage(PingAndWarmResponse message) {
response = message;
}

@Override
public void onClose(Status status, Metadata trailers) {
if (status.isOk()) {
future.set(response);
} else {
future.setException(status.asException());
}
}
},
createMetadata(headers, request));
clientCall.sendMessage(request);
clientCall.halfClose();
clientCall.request(Integer.MAX_VALUE);

future.get(1, TimeUnit.MINUTES);
} catch (Throwable e) {
// TODO: Not sure if we should swallow the error here. We are pre-emptively swapping
// channels if the new
// channel is bad.
LOG.log(Level.WARNING, "failed to prime channel", e);
}
}

private static Metadata createMetadata(Map<String, String> headers, PingAndWarmRequest request) {
Metadata metadata = new Metadata();

headers.forEach(
(k, v) -> metadata.put(Metadata.Key.of(k, Metadata.ASCII_STRING_MARSHALLER), v));
try {
metadata.put(
REQUEST_PARAMS,
String.format(
"name=%s&app_profile_id=%s",
URLEncoder.encode(request.getName(), "UTF-8"),
URLEncoder.encode(request.getAppProfileId(), "UTF-8")));
} catch (UnsupportedEncodingException e) {
LOG.log(Level.WARNING, "Failed to encode request params", e);
}

return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ public static BigtableClientContext create(EnhancedBigtableStubSettings settings
if (builder.isRefreshingChannel()) {
transportProvider.setChannelPrimer(
BigtableChannelPrimer.create(
builder.getProjectId(),
builder.getInstanceId(),
builder.getAppProfileId(),
credentials,
settings.getProjectId(),
settings.getInstanceId(),
settings.getAppProfileId()));
builder.getHeaderProvider().getHeaders()));
}

builder.setTransportChannelProvider(transportProvider.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@
import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.bigtable.v2.ReadChangeStreamRequest;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.bigtable.v2.ReadRowsRequest;
Expand Down Expand Up @@ -188,7 +186,6 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final UnaryCallable<BulkMutation, Void> externalBulkMutateRowsCallable;
private final UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable;
private final UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable;
private final UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable;

private final ServerStreamingCallable<String, ByteStringRange>
generateInitialChangeStreamPartitionsCallable;
Expand Down Expand Up @@ -321,7 +318,6 @@ public EnhancedBigtableStub(
createGenerateInitialChangeStreamPartitionsCallable();
readChangeStreamCallable =
createReadChangeStreamCallable(new DefaultChangeStreamRecordAdapter());
pingAndWarmCallable = createPingAndWarmCallable();
executeQueryCallable = createExecuteQueryCallable();
}

Expand Down Expand Up @@ -1252,28 +1248,6 @@ ServerStreamingCallSettings<ReqT, RespT> convertUnaryToServerStreamingSettings(
.build();
}

private UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> createPingAndWarmCallable() {
UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarm =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<PingAndWarmRequest, PingAndWarmResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getPingAndWarmMethod())
.setParamsExtractor(
new RequestParamsExtractor<PingAndWarmRequest>() {
@Override
public Map<String, String> extract(PingAndWarmRequest request) {
return ImmutableMap.of(
"name", request.getName(),
"app_profile_id", request.getAppProfileId());
}
})
.build(),
Collections.emptySet());
return pingAndWarm.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withRetrySettings(settings.pingAndWarmSettings().getRetrySettings()));
}

private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> withRetries(
UnaryCallable<RequestT, ResponseT> innerCallable, UnaryCallSettings<?, ?> unaryCallSettings) {
UnaryCallable<RequestT, ResponseT> retrying;
Expand Down Expand Up @@ -1381,10 +1355,6 @@ public ExecuteQueryCallable executeQueryCallable() {
return executeQueryCallable;
}

UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable() {
return pingAndWarmCallable;
}

// </editor-fold>

private SpanName getSpanName(String methodName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.cloud.bigtable.data.v2.FakeServiceBuilder;
import com.google.common.collect.ImmutableMap;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
Expand Down Expand Up @@ -69,10 +70,11 @@ public void setup() throws IOException {

primer =
BigtableChannelPrimer.create(
OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)),
"fake-project",
"fake-instance",
"fake-app-profile");
"fake-app-profile",
OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)),
ImmutableMap.of("bigtable-feature", "fake-feature"));

channel =
ManagedChannelBuilder.forAddress("localhost", server.getPort()).usePlaintext().build();
Expand Down Expand Up @@ -150,6 +152,20 @@ public void testChannelErrorsAreLogged() {
}
}

@Test
public void testHeadersAreSent() {
primer.primeChannel(channel);

for (Metadata metadata : metadataInterceptor.metadataList) {
assertThat(metadata.get(BigtableChannelPrimer.REQUEST_PARAMS))
.isEqualTo(
"name=projects%2Ffake-project%2Finstances%2Ffake-instance&app_profile_id=fake-app-profile");
assertThat(
metadata.get(Metadata.Key.of("bigtable-feature", Metadata.ASCII_STRING_MARSHALLER)))
.isEqualTo("fake-feature");
}
}

private static class MetadataInterceptor implements ServerInterceptor {
ConcurrentLinkedQueue<Metadata> metadataList = new ConcurrentLinkedQueue<>();

Expand Down
Loading