diff --git a/google-cloud-bigtable/pom.xml b/google-cloud-bigtable/pom.xml index 7d027c995..4717a24de 100644 --- a/google-cloud-bigtable/pom.xml +++ b/google-cloud-bigtable/pom.xml @@ -709,7 +709,6 @@ grpc-auth is not directly used transitively, but is pulled to align with other grpc parts opencensus-impl-core is brought in transitively through opencensus-impl --> - io.grpc:grpc-auth io.opencensus:opencensus-impl-core diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java index ecbef85be..7495ca6ce 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java @@ -16,18 +16,27 @@ package com.google.cloud.bigtable.data.v2.stub; import com.google.api.core.BetaApi; -import com.google.api.gax.core.FixedCredentialsProvider; -import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.core.SettableApiFuture; import com.google.api.gax.grpc.ChannelPrimer; -import com.google.api.gax.grpc.GrpcTransportChannel; -import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.auth.Credentials; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.InstanceName; import com.google.bigtable.v2.PingAndWarmRequest; -import com.google.cloud.bigtable.data.v2.internal.NameUtil; -import com.google.common.base.Preconditions; +import com.google.bigtable.v2.PingAndWarmResponse; +import io.grpc.CallCredentials; +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.Deadline; import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.auth.MoreCallCredentials; import java.io.IOException; -import java.util.concurrent.ExecutionException; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; import java.util.logging.Logger; /** @@ -41,27 +50,40 @@ class BigtableChannelPrimer implements ChannelPrimer { private static Logger LOG = Logger.getLogger(BigtableChannelPrimer.class.toString()); - private final EnhancedBigtableStubSettings settingsTemplate; + static final Metadata.Key REQUEST_PARAMS = + Metadata.Key.of("x-goog-request-params", Metadata.ASCII_STRING_MARSHALLER); + private final PingAndWarmRequest request; + private final CallCredentials callCredentials; + private final Map headers; static BigtableChannelPrimer create( - Credentials credentials, String projectId, String instanceId, String appProfileId) { - EnhancedBigtableStubSettings.Builder builder = - EnhancedBigtableStubSettings.newBuilder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .setAppProfileId(appProfileId) - .setCredentialsProvider(FixedCredentialsProvider.create(credentials)) - // Disable refreshing channel here to avoid creating settings in a loop - .setRefreshingChannel(false) - .setExecutorProvider( - InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build()); - - return new BigtableChannelPrimer(builder.build()); + String projectId, + String instanceId, + String appProfileId, + Credentials credentials, + Map headers) { + return new BigtableChannelPrimer(projectId, instanceId, appProfileId, credentials, headers); } - private BigtableChannelPrimer(EnhancedBigtableStubSettings settingsTemplate) { - Preconditions.checkNotNull(settingsTemplate, "settingsTemplate can't be null"); - this.settingsTemplate = settingsTemplate; + BigtableChannelPrimer( + String projectId, + String instanceId, + String appProfileId, + Credentials credentials, + Map headers) { + if (credentials != null) { + callCredentials = MoreCallCredentials.from(credentials); + } else { + callCredentials = null; + } + + request = + PingAndWarmRequest.newBuilder() + .setName(InstanceName.format(projectId, instanceId)) + .setAppProfileId(appProfileId) + .build(); + + this.headers = headers; } @Override @@ -69,8 +91,7 @@ public void primeChannel(ManagedChannel managedChannel) { try { primeChannelUnsafe(managedChannel); } catch (IOException | RuntimeException e) { - LOG.warning( - String.format("Unexpected error while trying to prime a channel: %s", e.getMessage())); + LOG.log(Level.WARNING, "Unexpected error while trying to prime a channel", e); } } @@ -78,35 +99,64 @@ private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOExceptio sendPrimeRequests(managedChannel); } - private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException { - // Wrap the channel in a temporary stub - EnhancedBigtableStubSettings primingSettings = - settingsTemplate - .toBuilder() - .setTransportChannelProvider( - FixedTransportChannelProvider.create(GrpcTransportChannel.create(managedChannel))) - .build(); + private void sendPrimeRequests(ManagedChannel managedChannel) { + try { + ClientCall clientCall = + managedChannel.newCall( + BigtableGrpc.getPingAndWarmMethod(), + CallOptions.DEFAULT + .withCallCredentials(callCredentials) + .withDeadline(Deadline.after(1, TimeUnit.MINUTES))); - try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(primingSettings)) { - PingAndWarmRequest request = - PingAndWarmRequest.newBuilder() - .setName( - NameUtil.formatInstanceName( - primingSettings.getProjectId(), primingSettings.getInstanceId())) - .setAppProfileId(primingSettings.getAppProfileId()) - .build(); - - try { - stub.pingAndWarmCallable().call(request); - } catch (Throwable e) { - // TODO: Not sure if we should swallow the error here. We are pre-emptively swapping - // channels if the new - // channel is bad. - if (e instanceof ExecutionException) { - e = e.getCause(); - } - LOG.warning(String.format("Failed to prime channel: %s", e)); - } + SettableApiFuture future = SettableApiFuture.create(); + clientCall.start( + new ClientCall.Listener() { + PingAndWarmResponse response; + + @Override + public void onMessage(PingAndWarmResponse message) { + response = message; + } + + @Override + public void onClose(Status status, Metadata trailers) { + if (status.isOk()) { + future.set(response); + } else { + future.setException(status.asException()); + } + } + }, + createMetadata(headers, request)); + clientCall.sendMessage(request); + clientCall.halfClose(); + clientCall.request(Integer.MAX_VALUE); + + future.get(1, TimeUnit.MINUTES); + } catch (Throwable e) { + // TODO: Not sure if we should swallow the error here. We are pre-emptively swapping + // channels if the new + // channel is bad. + LOG.log(Level.WARNING, "Failed to prime channel", e); } } + + private static Metadata createMetadata(Map headers, PingAndWarmRequest request) { + Metadata metadata = new Metadata(); + + headers.forEach( + (k, v) -> metadata.put(Metadata.Key.of(k, Metadata.ASCII_STRING_MARSHALLER), v)); + try { + metadata.put( + REQUEST_PARAMS, + String.format( + "name=%s&app_profile_id=%s", + URLEncoder.encode(request.getName(), "UTF-8"), + URLEncoder.encode(request.getAppProfileId(), "UTF-8"))); + } catch (UnsupportedEncodingException e) { + LOG.log(Level.WARNING, "Failed to encode request params", e); + } + + return metadata; + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java index a8e18f364..a2587b0dd 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java @@ -100,10 +100,11 @@ public static BigtableClientContext create(EnhancedBigtableStubSettings settings if (builder.isRefreshingChannel()) { transportProvider.setChannelPrimer( BigtableChannelPrimer.create( + builder.getProjectId(), + builder.getInstanceId(), + builder.getAppProfileId(), credentials, - settings.getProjectId(), - settings.getInstanceId(), - settings.getAppProfileId())); + builder.getHeaderProvider().getHeaders())); } builder.setTransportChannelProvider(transportProvider.build()); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 5cab91c92..46377fbc4 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -61,8 +61,6 @@ import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsResponse; import com.google.bigtable.v2.MutateRowsRequest; import com.google.bigtable.v2.MutateRowsResponse; -import com.google.bigtable.v2.PingAndWarmRequest; -import com.google.bigtable.v2.PingAndWarmResponse; import com.google.bigtable.v2.ReadChangeStreamRequest; import com.google.bigtable.v2.ReadChangeStreamResponse; import com.google.bigtable.v2.ReadRowsRequest; @@ -188,7 +186,6 @@ public class EnhancedBigtableStub implements AutoCloseable { private final UnaryCallable externalBulkMutateRowsCallable; private final UnaryCallable checkAndMutateRowCallable; private final UnaryCallable readModifyWriteRowCallable; - private final UnaryCallable pingAndWarmCallable; private final ServerStreamingCallable generateInitialChangeStreamPartitionsCallable; @@ -321,7 +318,6 @@ public EnhancedBigtableStub( createGenerateInitialChangeStreamPartitionsCallable(); readChangeStreamCallable = createReadChangeStreamCallable(new DefaultChangeStreamRecordAdapter()); - pingAndWarmCallable = createPingAndWarmCallable(); executeQueryCallable = createExecuteQueryCallable(); } @@ -1252,28 +1248,6 @@ ServerStreamingCallSettings convertUnaryToServerStreamingSettings( .build(); } - private UnaryCallable createPingAndWarmCallable() { - UnaryCallable pingAndWarm = - GrpcRawCallableFactory.createUnaryCallable( - GrpcCallSettings.newBuilder() - .setMethodDescriptor(BigtableGrpc.getPingAndWarmMethod()) - .setParamsExtractor( - new RequestParamsExtractor() { - @Override - public Map extract(PingAndWarmRequest request) { - return ImmutableMap.of( - "name", request.getName(), - "app_profile_id", request.getAppProfileId()); - } - }) - .build(), - Collections.emptySet()); - return pingAndWarm.withDefaultCallContext( - clientContext - .getDefaultCallContext() - .withRetrySettings(settings.pingAndWarmSettings().getRetrySettings())); - } - private UnaryCallable withRetries( UnaryCallable innerCallable, UnaryCallSettings unaryCallSettings) { UnaryCallable retrying; @@ -1381,10 +1355,6 @@ public ExecuteQueryCallable executeQueryCallable() { return executeQueryCallable; } - UnaryCallable pingAndWarmCallable() { - return pingAndWarmCallable; - } - // private SpanName getSpanName(String methodName) { diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java index e1f22bebb..709b48247 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java @@ -24,6 +24,7 @@ import com.google.bigtable.v2.PingAndWarmRequest; import com.google.bigtable.v2.PingAndWarmResponse; import com.google.cloud.bigtable.data.v2.FakeServiceBuilder; +import com.google.common.collect.ImmutableMap; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; @@ -69,10 +70,11 @@ public void setup() throws IOException { primer = BigtableChannelPrimer.create( - OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)), "fake-project", "fake-instance", - "fake-app-profile"); + "fake-app-profile", + OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)), + ImmutableMap.of("bigtable-feature", "fake-feature")); channel = ManagedChannelBuilder.forAddress("localhost", server.getPort()).usePlaintext().build(); @@ -133,7 +135,7 @@ public PingAndWarmResponse apply(PingAndWarmRequest pingAndWarmRequest) { assertThat(logHandler.logs).hasSize(1); for (LogRecord log : logHandler.logs) { - assertThat(log.getMessage()).contains("FAILED_PRECONDITION"); + assertThat(log.getThrown().getMessage()).contains("FAILED_PRECONDITION"); } } @@ -146,7 +148,21 @@ public void testChannelErrorsAreLogged() { assertThat(logHandler.logs).hasSize(1); for (LogRecord log : logHandler.logs) { - assertThat(log.getMessage()).contains("UnsupportedOperationException"); + assertThat(log.getThrown()).isInstanceOf(UnsupportedOperationException.class); + } + } + + @Test + public void testHeadersAreSent() { + primer.primeChannel(channel); + + for (Metadata metadata : metadataInterceptor.metadataList) { + assertThat(metadata.get(BigtableChannelPrimer.REQUEST_PARAMS)) + .isEqualTo( + "name=projects%2Ffake-project%2Finstances%2Ffake-instance&app_profile_id=fake-app-profile"); + assertThat( + metadata.get(Metadata.Key.of("bigtable-feature", Metadata.ASCII_STRING_MARSHALLER))) + .isEqualTo("fake-feature"); } }