readRowCallable() {
return readRowCallable;
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java
new file mode 100644
index 0000000000..90f5958dc3
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.readrows;
+
+import com.google.api.core.InternalApi;
+import com.google.api.gax.retrying.StreamResumptionStrategy;
+import com.google.api.gax.rpc.ApiException;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.ReadRowsRequest.Builder;
+import com.google.bigtable.v2.RowSet;
+import com.google.cloud.bigtable.data.v2.internal.RowSetUtil;
+import com.google.cloud.bigtable.data.v2.models.RowAdapter;
+import com.google.cloud.bigtable.data.v2.stub.BigtableStreamResumptionStrategy;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import java.util.Base64;
+import java.util.logging.Logger;
+
+/**
+ * An implementation of a {@link StreamResumptionStrategy} for merged rows. This class tracks -
+ *
+ *
+ * - row key for the last row that was read successfully
+ *
- row key for large-row that couldn't be read
+ *
- list of all row keys for large-rows
+ *
+ *
+ * Upon retry this class builds a request to omit the large rows & retry from the last row key that
+ * was successfully read.
+ *
+ * This class is considered an internal implementation detail and not meant to be used by
+ * applications.
+ */
+@InternalApi
+public class LargeReadRowsResumptionStrategy
+ extends BigtableStreamResumptionStrategy {
+ private static final Logger LOGGER =
+ Logger.getLogger(LargeReadRowsResumptionStrategy.class.getName());
+ private final RowAdapter rowAdapter;
+ private ByteString lastSuccessKey = ByteString.EMPTY;
+ // Number of rows processed excluding Marker row.
+ private long numProcessed;
+ private ByteString largeRowKey = ByteString.EMPTY;
+ // we modify the original request in the resumption strategy regardless of how many times it has
+ // failed, {@code previousFailedRequestRowset} is stored for the use case of continuous large rows
+ // row-keys
+ private RowSet previousFailedRequestRowset = null;
+
+ public LargeReadRowsResumptionStrategy(RowAdapter rowAdapter) {
+ this.rowAdapter = rowAdapter;
+ }
+
+ @Override
+ public boolean canResume() {
+ return true;
+ }
+
+ @Override
+ public StreamResumptionStrategy createNew() {
+ return new LargeReadRowsResumptionStrategy<>(rowAdapter);
+ }
+
+ @Override
+ public RowT processResponse(RowT response) {
+ // Last key can come from both the last processed row key and a synthetic row marker. The
+ // synthetic row marker is emitted when the server has read a lot of data that was filtered out.
+ // The row marker can be used to trim the start of the scan, but does not contribute to the row
+ // limit.
+ lastSuccessKey = rowAdapter.getKey(response);
+
+ if (!rowAdapter.isScanMarkerRow(response)) {
+ // Only real rows count towards the rows limit.
+ numProcessed++;
+ }
+ return response;
+ }
+
+ public Throwable processError(Throwable throwable) {
+ ByteString rowKeyExtracted = extractLargeRowKey(throwable);
+ if (rowKeyExtracted != null) {
+ LOGGER.warning("skipping large row " + rowKeyExtracted);
+ this.largeRowKey = rowKeyExtracted;
+ numProcessed = numProcessed + 1;
+ }
+ return throwable;
+ }
+
+ private ByteString extractLargeRowKey(Throwable t) {
+ if (t instanceof ApiException
+ && ((ApiException) t).getReason() != null
+ && ((ApiException) t).getReason().equals("LargeRowReadError")) {
+ String rowKey = ((ApiException) t).getMetadata().get("rowKeyBase64Encoded");
+
+ byte[] decodedBytes = Base64.getDecoder().decode(rowKey);
+ return ByteString.copyFrom(decodedBytes);
+ }
+ return null;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * This returns an updated request excluding all the rows keys & ranges till (including) {@link
+ * #lastSuccessKey} & also excludes the last encountered large row key ({@link #largeRowKey}).
+ * Also, this implementation takes care to update the row limit of the request to account for all
+ * of the received rows.
+ */
+ @Override
+ public ReadRowsRequest getResumeRequest(ReadRowsRequest originalRequest) {
+
+ // An empty lastSuccessKey means that we have not successfully read the first row,
+ // so resume with the original request object.
+ if (lastSuccessKey.isEmpty() && largeRowKey.isEmpty()) {
+ return originalRequest;
+ }
+
+ RowSet remaining;
+ if (previousFailedRequestRowset == null) {
+ remaining = originalRequest.getRows();
+ } else {
+ remaining = previousFailedRequestRowset;
+ }
+
+ if (!lastSuccessKey.isEmpty()) {
+ remaining = RowSetUtil.erase(remaining, lastSuccessKey, !originalRequest.getReversed());
+ }
+ if (!largeRowKey.isEmpty()) {
+ remaining = RowSetUtil.eraseLargeRow(remaining, largeRowKey);
+ }
+ this.largeRowKey = ByteString.EMPTY;
+
+ previousFailedRequestRowset = remaining;
+
+ // Edge case: retrying a fulfilled request.
+ // A fulfilled request is one that has had all of its row keys and ranges fulfilled, or if it
+ // had a row limit, has seen enough rows. These requests are replaced with a marker request that
+ // will be handled by ReadRowsRetryCompletedCallable. See docs in ReadRowsRetryCompletedCallable
+ // for more details.
+ if (remaining == null
+ || (originalRequest.getRowsLimit() > 0 && originalRequest.getRowsLimit() == numProcessed)) {
+ return ReadRowsRetryCompletedCallable.FULFILLED_REQUEST_MARKER;
+ }
+
+ Builder builder = originalRequest.toBuilder().setRows(remaining);
+
+ if (originalRequest.getRowsLimit() > 0) {
+ Preconditions.checkState(
+ originalRequest.getRowsLimit() > numProcessed,
+ "Processed rows and number of large rows should not exceed the row limit in the original request");
+ builder.setRowsLimit(originalRequest.getRowsLimit() - numProcessed);
+ }
+
+ return builder.build();
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java
index a78e7643b0..3d696213a6 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java
@@ -73,4 +73,24 @@ public static ServerStreamingCallable
return new RetryingServerStreamingCallable<>(
innerCallable, retryingExecutor, settings.getResumptionStrategy());
}
+
+ public static
+ ServerStreamingCallable retryingForLargeRows(
+ ServerStreamingCallable innerCallable,
+ ServerStreamingCallSettings callSettings,
+ ClientContext clientContext) {
+
+ ServerStreamingCallSettings settings = callSettings;
+
+ StreamingRetryAlgorithm retryAlgorithm =
+ new StreamingRetryAlgorithm<>(
+ new LargeRowRetryAlgorithm<>(),
+ new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock()));
+
+ ScheduledRetryingExecutor retryingExecutor =
+ new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
+
+ return new RetryingServerStreamingCallable<>(
+ innerCallable, retryingExecutor, settings.getResumptionStrategy());
+ }
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/LargeRowRetryAlgorithm.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/LargeRowRetryAlgorithm.java
new file mode 100644
index 0000000000..ef4e8a5b45
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/LargeRowRetryAlgorithm.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.gaxx.retrying;
+
+import com.google.api.core.InternalApi;
+import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
+import com.google.api.gax.retrying.RetryingContext;
+import com.google.api.gax.retrying.TimedAttemptSettings;
+import com.google.api.gax.rpc.ApiException;
+import com.google.protobuf.util.Durations;
+import com.google.rpc.RetryInfo;
+import javax.annotation.Nullable;
+
+/**
+ * This retry algorithm checks the metadata of an exception for additional error details. It also
+ * allows to retry for {@link com.google.api.gax.rpc.FailedPreconditionException} with
+ * ErrorDetails.Reason as "LargeRowReadError" (for large rows). If the metadata has a RetryInfo
+ * field, use the retry delay to set the wait time between attempts.
+ */
+@InternalApi
+public class LargeRowRetryAlgorithm extends BasicResultRetryAlgorithm {
+
+ @Override
+ public TimedAttemptSettings createNextAttempt(
+ Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
+ java.time.Duration retryDelay = extractRetryDelay(prevThrowable);
+ if (retryDelay != null) {
+ return prevSettings
+ .toBuilder()
+ .setRandomizedRetryDelayDuration(retryDelay)
+ .setAttemptCount(prevSettings.getAttemptCount() + 1)
+ .setOverallAttemptCount(prevSettings.getAttemptCount() + 1)
+ .build();
+ }
+ return null;
+ }
+
+ /** Returns true if previousThrowable is an {@link ApiException} that is retryable. */
+ @Override
+ public boolean shouldRetry(Throwable previousThrowable, ResponseT previousResponse) {
+ return shouldRetry(null, previousThrowable, previousResponse);
+ }
+
+ /**
+ * If {@link RetryingContext#getRetryableCodes()} is not null: Returns true if the status code of
+ * previousThrowable is in the list of retryable code of the {@link RetryingContext}.
+ *
+ * Otherwise it returns the result of {@link #shouldRetry(Throwable, Object)}.
+ */
+ @Override
+ public boolean shouldRetry(
+ @Nullable RetryingContext context, Throwable previousThrowable, ResponseT previousResponse) {
+ if (extractRetryDelay(previousThrowable) != null) {
+ // First check if server wants us to retry
+ return true;
+ }
+ if (isLargeRowException(previousThrowable)) {
+ return true;
+ }
+ if (context != null && context.getRetryableCodes() != null) {
+ // Ignore the isRetryable() value of the throwable if the RetryingContext has a specific list
+ // of codes that should be retried.
+ return ((previousThrowable instanceof ApiException)
+ && context
+ .getRetryableCodes()
+ .contains(((ApiException) previousThrowable).getStatusCode().getCode()));
+ }
+
+ // Server didn't have retry information and there's no retry context, use the local status
+ // code config.
+ return previousThrowable instanceof ApiException
+ && ((ApiException) previousThrowable).isRetryable();
+ }
+
+ public boolean isLargeRowException(Throwable previousThrowable) {
+ return (previousThrowable != null)
+ && (previousThrowable instanceof ApiException)
+ && ((ApiException) previousThrowable).getReason() != null
+ && ((ApiException) previousThrowable).getReason().equals("LargeRowReadError");
+ }
+
+ static java.time.Duration extractRetryDelay(@Nullable Throwable throwable) {
+ if (throwable == null) {
+ return null;
+ }
+ if (!(throwable instanceof ApiException)) {
+ return null;
+ }
+ ApiException exception = (ApiException) throwable;
+ if (exception.getErrorDetails() == null) {
+ return null;
+ }
+ if (exception.getErrorDetails().getRetryInfo() == null) {
+ return null;
+ }
+ RetryInfo retryInfo = exception.getErrorDetails().getRetryInfo();
+ return java.time.Duration.ofMillis(Durations.toMillis(retryInfo.getRetryDelay()));
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java
index 7ec29f8b77..7f5c39ec0a 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java
@@ -345,7 +345,6 @@ private void onAttemptError(Throwable throwable) {
synchronized (lock) {
localCancellationCause = cancellationCause;
}
-
if (resumptionStrategy instanceof BigtableStreamResumptionStrategy) {
throwable = ((BigtableStreamResumptionStrategy) resumptionStrategy).processError(throwable);
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/LargeRowIT.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/LargeRowIT.java
index 4ccf9167f4..50abc2bcde 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/LargeRowIT.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/LargeRowIT.java
@@ -16,27 +16,68 @@
package com.google.cloud.bigtable.data.v2.it;
import static com.google.common.truth.Truth.assertThat;
+import static com.google.common.truth.TruthJUnit.assume;
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.rpc.ResponseObserver;
+import com.google.api.gax.rpc.StreamController;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
+import com.google.cloud.bigtable.admin.v2.models.Table;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.Row;
+import com.google.cloud.bigtable.data.v2.models.RowCell;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
+import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
+import com.google.cloud.bigtable.data.v2.models.TableId;
+import com.google.cloud.bigtable.test_helpers.env.CloudEnv;
+import com.google.cloud.bigtable.test_helpers.env.PrefixGenerator;
import com.google.cloud.bigtable.test_helpers.env.TestEnvRule;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
+import java.util.List;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
+import org.junit.After;
+import org.junit.Before;
import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class LargeRowIT {
+
private static final Logger logger = Logger.getLogger(LargeRowIT.class.getName());
@ClassRule public static final TestEnvRule testEnvRule = new TestEnvRule();
+ private BigtableTableAdminClient tableAdminClient;
+ private Table table;
+ private String familyId = "cf";
+
+ @Before
+ public void setup() {
+ tableAdminClient = testEnvRule.env().getTableAdminClient();
+ String tableId = PrefixGenerator.newPrefix("LargeRowTest");
+ table = tableAdminClient.createTable(CreateTableRequest.of(tableId).addFamily(familyId));
+ }
+
+ @After
+ public void tearDown() {
+ if (table != null) {
+ tableAdminClient.deleteTable(table.getId());
+ }
+ }
+
@Test
public void testWriteRead() throws Exception {
String rowKey = UUID.randomUUID().toString();
@@ -73,4 +114,169 @@ public void testWriteRead() throws Exception {
assertThat(row.getCells().get(0).getValue()).isEqualTo(largeValue);
assertThat(row.getCells().get(1).getValue()).isEqualTo(largeValue);
}
+
+ static class AccumulatingObserver implements ResponseObserver {
+
+ final List responses = Lists.newArrayList();
+ final SettableApiFuture completionFuture = SettableApiFuture.create();
+
+ void awaitCompletion() throws Throwable {
+ try {
+ completionFuture.get(10, TimeUnit.MINUTES);
+ } catch (ExecutionException e) {
+ throw e.getCause();
+ }
+ }
+
+ @Override
+ public void onStart(StreamController controller) {}
+
+ @Override
+ public void onResponse(Row row) {
+ responses.add(row);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ completionFuture.setException(t);
+ }
+
+ @Override
+ public void onComplete() {
+ completionFuture.set(null);
+ }
+ }
+
+ // TODO: remove the ignore annotation once the server code (large row error with metadata) is
+ // released on prod
+ @Test
+ @Ignore("large-row-error with metadata yet to be released on prod")
+ public void read() throws Throwable {
+ assume()
+ .withMessage("Large row read errors are not supported by emulator")
+ .that(testEnvRule.env())
+ .isInstanceOf(CloudEnv.class);
+
+ BigtableDataClient client = testEnvRule.env().getDataClient();
+ String tableId = table.getId();
+ String familyId = this.familyId;
+ long timestampMicros = System.currentTimeMillis() * 1_000;
+
+ // small row creations
+ client.bulkMutateRows(
+ BulkMutation.create(tableId)
+ .add(
+ RowMutationEntry.create("r1")
+ .setCell(familyId, "qualifier", timestampMicros, "my-value"))
+ .add(
+ RowMutationEntry.create("r4")
+ .setCell(familyId, "qualifier", timestampMicros, "my-value"))
+ .add(
+ RowMutationEntry.create("r5")
+ .setCell(familyId, "qualifier", timestampMicros, "my-value"))
+ .add(
+ RowMutationEntry.create("r6")
+ .setCell(familyId, "qualifier", timestampMicros, "my-value")));
+
+ Row expectedRow1 =
+ Row.create(
+ ByteString.copyFromUtf8("r1"),
+ ImmutableList.of(
+ RowCell.create(
+ familyId,
+ ByteString.copyFromUtf8("qualifier"),
+ timestampMicros,
+ ImmutableList.of(),
+ ByteString.copyFromUtf8("my-value"))));
+
+ Row expectedRow4 =
+ Row.create(
+ ByteString.copyFromUtf8("r4"),
+ ImmutableList.of(
+ RowCell.create(
+ familyId,
+ ByteString.copyFromUtf8("qualifier"),
+ timestampMicros,
+ ImmutableList.of(),
+ ByteString.copyFromUtf8("my-value"))));
+
+ // large row creation
+ byte[] largeValueBytes = new byte[3 * 1024 * 1024];
+ ByteString largeValue = ByteString.copyFrom(largeValueBytes);
+
+ for (int i = 0; i < 100; i++) {
+ ByteString qualifier = ByteString.copyFromUtf8("qualifier1_" + "_" + String.valueOf(i));
+ client.mutateRow(
+ RowMutation.create(TableId.of(tableId), "r2").setCell(familyId, qualifier, largeValue));
+ client.mutateRow(
+ RowMutation.create(TableId.of(tableId), "r3").setCell(familyId, qualifier, largeValue));
+ }
+
+ // sync
+ assertThat(
+ client
+ .skipLargeRowsCallable()
+ .all()
+ .call(
+ Query.create(tableId)
+ .range(ByteStringRange.unbounded().startClosed("r1").endOpen("r3"))))
+ .containsExactly(expectedRow1);
+
+ assertThat(
+ client
+ .skipLargeRowsCallable()
+ .all()
+ .call(
+ Query.create(tableId)
+ .range(ByteStringRange.unbounded().startClosed("r1").endClosed("r4"))))
+ .containsExactly(expectedRow1, expectedRow4);
+
+ List emptyRows =
+ client
+ .skipLargeRowsCallable()
+ .all()
+ .call(
+ Query.create(tableId)
+ .range(ByteStringRange.unbounded().startClosed("r2").endClosed("r3")));
+ assertThat(emptyRows).isEmpty();
+
+ List startWithFaultyRow =
+ client
+ .skipLargeRowsCallable()
+ .all()
+ .call(
+ Query.create(tableId)
+ .range(ByteStringRange.unbounded().startClosed("r2").endClosed("r4")));
+ assertThat(startWithFaultyRow).containsExactly(expectedRow4);
+
+ List endsWithFaultyRow =
+ client
+ .skipLargeRowsCallable()
+ .all()
+ .call(
+ Query.create(tableId)
+ .range(ByteStringRange.unbounded().startClosed("r1").endClosed("r3")));
+ assertThat(endsWithFaultyRow).containsExactly(expectedRow1);
+
+ assertThat(
+ client
+ .skipLargeRowsCallable()
+ .all()
+ .call(
+ Query.create(tableId)
+ .range(ByteStringRange.unbounded().startClosed("r1").endClosed("r4"))))
+ .containsExactly(expectedRow1, expectedRow4);
+ // async
+ AccumulatingObserver observer = new AccumulatingObserver();
+ Query query = Query.create(tableId).range("r1", "r3");
+ client.skipLargeRowsCallable().call(query, observer);
+ observer.awaitCompletion();
+ assertThat(observer.responses).containsExactly(expectedRow1);
+
+ AccumulatingObserver observer2 = new AccumulatingObserver();
+ Query query2 = Query.create(tableId).range("r1", "r5");
+ client.skipLargeRowsCallable().call(query2, observer2);
+ observer2.awaitCompletion();
+ assertThat(observer2.responses).containsExactly(expectedRow1, expectedRow4);
+ }
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java
index 8a8c6d7709..3ff77a3f5d 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java
@@ -19,35 +19,45 @@
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.ErrorDetails;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.UnavailableException;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ReadRowsResponse;
import com.google.bigtable.v2.ReadRowsResponse.CellChunk;
import com.google.bigtable.v2.RowRange;
+import com.google.bigtable.v2.RowSet;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.Row;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.truth.Truth;
+import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.StringValue;
+import com.google.rpc.ErrorInfo;
+import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcServerRule;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Base64;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -60,6 +70,8 @@ public class ReadRowsRetryTest {
private static final String PROJECT_ID = "fake-project";
private static final String INSTANCE_ID = "fake-instance";
private static final String TABLE_ID = "fake-table";
+ private static final Metadata.Key super byte[]> ERROR_DETAILS_KEY =
+ Metadata.Key.of("grpc-status-details-bin", Metadata.BINARY_BYTE_MARSHALLER);
@Rule public GrpcServerRule serverRule = new GrpcServerRule();
private TestBigtableService service;
@@ -124,6 +136,203 @@ public void immediateRetryTest() {
Truth.assertThat(actualResults).containsExactly("k1", "r1", "r2").inOrder();
}
+ public ApiException createLargeRowException(String rowKey) {
+ ErrorInfo errorInfo =
+ ErrorInfo.newBuilder()
+ .setReason("LargeRowReadError")
+ .setDomain("bigtable.googleapis.com")
+ .putMetadata(
+ "rowKeyBase64Encoded", Base64.getEncoder().encodeToString(rowKey.getBytes()))
+ .build();
+
+ Any packedErrorInfo = Any.pack(errorInfo);
+
+ ErrorDetails errorDetails =
+ ErrorDetails.builder().setRawErrorMessages(ImmutableList.of(packedErrorInfo)).build();
+
+ Metadata trailers = new Metadata();
+ byte[] status =
+ com.google.rpc.Status.newBuilder().addDetails(Any.pack(errorInfo)).build().toByteArray();
+ trailers.put(ERROR_DETAILS_KEY, status);
+ return (new UnavailableException(
+ new StatusRuntimeException(Status.FAILED_PRECONDITION, trailers),
+ GrpcStatusCode.of(Code.FAILED_PRECONDITION),
+ false,
+ errorDetails));
+ }
+
+ @Test
+ public void readRowsWithLimitSkippingLargeRowsTest() {
+ // Large rows is r2 for range r1 to r8
+ ApiException largeRowExceptionWithTrailersR2 = createLargeRowException("r2");
+
+ List> rangeList;
+ List actualResults;
+
+ // TEST - range end is large row || row limit
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequest(Range.closedOpen("r1", "r3"))
+ .expectRowLimit(2)
+ .respondWith("r1")
+ .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR2));
+
+ actualResults = getSkipLargeRowsResults(Query.create(TABLE_ID).range("r1", "r3").limit(2));
+ Truth.assertThat(actualResults).containsExactly("r1").inOrder();
+
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequest(Range.closedOpen("r4", "r7"))
+ .expectRowLimit(2)
+ .respondWith("r4", "r5"));
+
+ actualResults = getSkipLargeRowsResults(Query.create(TABLE_ID).range("r4", "r7").limit(2));
+ Truth.assertThat(actualResults).containsExactly("r4", "r5").inOrder();
+ }
+
+ @Test
+ public void readRowsForRowKeyWithLargeRowsTest() {
+ // Large rows is r2 for range r1 to r8
+ ApiException largeRowExceptionWithTrailersR7 = createLargeRowException("r7");
+ List actualResults;
+
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequest("r1", "r7", "r4", "r8")
+ .respondWith("r1", "r4")
+ .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR7));
+ service.expectations.add(RpcExpectation.create().expectRequest("r8").respondWith("r8"));
+
+ actualResults =
+ getSkipLargeRowsResults(
+ Query.create(TABLE_ID).rowKey("r1").rowKey("r7").rowKey("r4").rowKey("r8"));
+ Truth.assertThat(actualResults).containsExactly("r1", "r4", "r8").inOrder();
+ }
+
+ /**
+ * This tests if in a read rows request RowRange includes large rows, those rows are omitted in
+ * the response.
+ */
+ @Test
+ public void readRowRangeWithSkippingLargeRows() {
+
+ // Large rows are r2, r3,r7 from r1 to r8
+ ApiException largeRowExceptionWithTrailersR2 = createLargeRowException("r2");
+ ApiException largeRowExceptionWithTrailersR3 = createLargeRowException("r3");
+ ApiException largeRowExceptionWithTrailersR7 = createLargeRowException("r7");
+
+ List> rangeList;
+ List actualResults;
+
+ // TEST - only query for large rows - should receive an empty response
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequest(Range.closedOpen("r2", "r4"))
+ .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR2));
+
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequest(Range.open("r2", "r4"))
+ .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR3));
+
+ rangeList = new ArrayList>();
+ rangeList.add(Range.open("r2", "r3"));
+ rangeList.add(Range.open("r3", "r4"));
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequestForMultipleRowRanges(rangeList)
+ .respondWithStatus(Code.OK));
+
+ actualResults = getSkipLargeRowsResults(Query.create(TABLE_ID).range("r2", "r4"));
+ Truth.assertThat(actualResults.size()).isEqualTo(0);
+
+ // TEST - range start is large row
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequest(Range.closedOpen("r3", "r5"))
+ .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR3));
+
+ service.expectations.add(
+ RpcExpectation.create().expectRequest(Range.open("r3", "r5")).respondWith("r4"));
+
+ actualResults = getSkipLargeRowsResults(Query.create(TABLE_ID).range("r3", "r5"));
+ Truth.assertThat(actualResults).containsExactly("r4").inOrder();
+
+ // TEST - range end is large row
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequest(Range.closedOpen("r1", "r3"))
+ .respondWith("r1")
+ .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR2));
+
+ rangeList = new ArrayList>();
+ rangeList.add(Range.open("r1", "r2"));
+ rangeList.add(Range.open("r2", "r3"));
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequestForMultipleRowRanges(rangeList)
+ .respondWithStatus(Code.OK));
+
+ actualResults = getSkipLargeRowsResults(Query.create(TABLE_ID).range("r1", "r3"));
+ Truth.assertThat(actualResults).containsExactly("r1").inOrder();
+
+ // r2 faulty
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequest(Range.closedOpen("r1", "r9"))
+ .respondWith("r1")
+ .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR2));
+
+ // r3 faulty
+ rangeList = new ArrayList>();
+ rangeList.add(Range.open("r1", "r2"));
+ rangeList.add(Range.open("r2", "r9"));
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequestForMultipleRowRanges(rangeList)
+ .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR3));
+
+ rangeList = new ArrayList>();
+ rangeList.add(Range.open("r1", "r2"));
+ rangeList.add(Range.open("r2", "r3"));
+ rangeList.add(Range.open("r3", "r9"));
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequestForMultipleRowRanges(rangeList)
+ .respondWith("r4", "r5")
+ .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR7));
+
+ rangeList = new ArrayList>();
+ rangeList.add(Range.open("r5", "r7"));
+ rangeList.add(Range.open("r7", "r9"));
+
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequestForMultipleRowRanges(rangeList)
+ .respondWith("r6", "r8"));
+
+ actualResults = getSkipLargeRowsResults(Query.create(TABLE_ID).range("r1", "r9"));
+ Truth.assertThat(actualResults).containsExactly("r1", "r4", "r5", "r6", "r8").inOrder();
+
+ // TEST - reverse query with large rows
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequest(Range.closedOpen("r3", "r7"))
+ .setReversed(true)
+ .respondWith("r6", "r5", "r4")
+ .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR3));
+
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequest(Range.open("r3", "r4"))
+ .setReversed(true)
+ .respondWithStatus(Code.OK));
+
+ actualResults =
+ getSkipLargeRowsResults(Query.create(TABLE_ID).range("r3", "r7").reversed(true));
+ Truth.assertThat(actualResults).containsExactly("r6", "r5", "r4").inOrder();
+ }
+
@Test
public void multipleRetryTest() {
service.expectations.add(
@@ -299,6 +508,15 @@ private List getResults(Query query) {
return actualValues;
}
+ private List getSkipLargeRowsResults(Query query) {
+ List actualRowKeys =
+ client.skipLargeRowsCallable().all().call(query).stream()
+ .map(row -> row.getKey().toStringUtf8())
+ .collect(Collectors.toList());
+
+ return actualRowKeys;
+ }
+
private static class TestBigtableService extends BigtableGrpc.BigtableImplBase {
Queue expectations = new LinkedBlockingDeque<>();
int i = -1;
@@ -336,6 +554,11 @@ private static class RpcExpectation {
ApiException exception;
List responses;
+ private RpcExpectation setReversed(boolean reverse) {
+ this.requestBuilder.setReversed(reverse);
+ return this;
+ }
+
private RpcExpectation() {
this.requestBuilder =
ReadRowsRequest.newBuilder()
@@ -355,6 +578,58 @@ RpcExpectation expectRequest(String... keys) {
return this;
}
+ RpcExpectation expectRequestForMultipleRowRanges(List> rowRanges) {
+ RowSet.Builder rowRange = requestBuilder.getRowsBuilder();
+ for (Range range : rowRanges) {
+ rowRangeBuilder(range);
+ }
+ return this;
+ }
+
+ /**
+ * Build Row Range
+ *
+ * @param range
+ * @return
+ */
+ RowRange rowRangeBuilder(Range range) {
+
+ RowRange.Builder rowRange = requestBuilder.getRowsBuilder().addRowRangesBuilder();
+
+ if (range.hasLowerBound()) {
+ switch (range.lowerBoundType()) {
+ case CLOSED:
+ rowRange.setStartKeyClosed(ByteString.copyFromUtf8(range.lowerEndpoint()));
+ break;
+ case OPEN:
+ rowRange.setStartKeyOpen(ByteString.copyFromUtf8(range.lowerEndpoint()));
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unexpected lowerBoundType: " + range.lowerBoundType());
+ }
+ } else {
+ rowRange.clearStartKey();
+ }
+
+ if (range.hasUpperBound()) {
+ switch (range.upperBoundType()) {
+ case CLOSED:
+ rowRange.setEndKeyClosed(ByteString.copyFromUtf8(range.upperEndpoint()));
+ break;
+ case OPEN:
+ rowRange.setEndKeyOpen(ByteString.copyFromUtf8(range.upperEndpoint()));
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unexpected upperBoundType: " + range.upperBoundType());
+ }
+ } else {
+ rowRange.clearEndKey();
+ }
+ return rowRange.build();
+ }
+
RpcExpectation expectRequest(Range range) {
RowRange.Builder rowRange = requestBuilder.getRowsBuilder().addRowRangesBuilder();
diff --git a/grpc-google-cloud-bigtable-admin-v2/pom.xml b/grpc-google-cloud-bigtable-admin-v2/pom.xml
index a7af10a335..86bc9e8d0f 100644
--- a/grpc-google-cloud-bigtable-admin-v2/pom.xml
+++ b/grpc-google-cloud-bigtable-admin-v2/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-bigtable-admin-v2
- 2.52.0
+ 2.53.0
grpc-google-cloud-bigtable-admin-v2
GRPC library for grpc-google-cloud-bigtable-admin-v2
com.google.cloud
google-cloud-bigtable-parent
- 2.52.0
+ 2.53.0
@@ -18,14 +18,14 @@
com.google.cloud
google-cloud-bigtable-deps-bom
- 2.52.0
+ 2.53.0
pom
import
com.google.cloud
google-cloud-bigtable-bom
- 2.52.0
+ 2.53.0
pom
import
diff --git a/grpc-google-cloud-bigtable-v2/pom.xml b/grpc-google-cloud-bigtable-v2/pom.xml
index eb95a38d80..4917e77e1e 100644
--- a/grpc-google-cloud-bigtable-v2/pom.xml
+++ b/grpc-google-cloud-bigtable-v2/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-bigtable-v2
- 2.52.0
+ 2.53.0
grpc-google-cloud-bigtable-v2
GRPC library for grpc-google-cloud-bigtable-v2
com.google.cloud
google-cloud-bigtable-parent
- 2.52.0
+ 2.53.0
@@ -18,14 +18,14 @@
com.google.cloud
google-cloud-bigtable-deps-bom
- 2.52.0
+ 2.53.0
pom
import
com.google.cloud
google-cloud-bigtable-bom
- 2.52.0
+ 2.53.0
pom
import
diff --git a/pom.xml b/pom.xml
index 27d11773ad..e5de6ecbaa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
google-cloud-bigtable-parent
pom
- 2.52.0
+ 2.53.0
Google Cloud Bigtable Parent
https://github.com/googleapis/java-bigtable
@@ -153,27 +153,27 @@
com.google.api.grpc
proto-google-cloud-bigtable-v2
- 2.52.0
+ 2.53.0
com.google.api.grpc
proto-google-cloud-bigtable-admin-v2
- 2.52.0
+ 2.53.0
com.google.api.grpc
grpc-google-cloud-bigtable-v2
- 2.52.0
+ 2.53.0
com.google.api.grpc
grpc-google-cloud-bigtable-admin-v2
- 2.52.0
+ 2.53.0
com.google.cloud
google-cloud-bigtable
- 2.52.0
+ 2.53.0
diff --git a/proto-google-cloud-bigtable-admin-v2/pom.xml b/proto-google-cloud-bigtable-admin-v2/pom.xml
index 8800619982..4ebb2b34e9 100644
--- a/proto-google-cloud-bigtable-admin-v2/pom.xml
+++ b/proto-google-cloud-bigtable-admin-v2/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
proto-google-cloud-bigtable-admin-v2
- 2.52.0
+ 2.53.0
proto-google-cloud-bigtable-admin-v2
PROTO library for proto-google-cloud-bigtable-admin-v2
com.google.cloud
google-cloud-bigtable-parent
- 2.52.0
+ 2.53.0
@@ -18,14 +18,14 @@
com.google.cloud
google-cloud-bigtable-deps-bom
- 2.52.0
+ 2.53.0
pom
import
com.google.cloud
google-cloud-bigtable-bom
- 2.52.0
+ 2.53.0
pom
import
diff --git a/proto-google-cloud-bigtable-v2/pom.xml b/proto-google-cloud-bigtable-v2/pom.xml
index 04f3706f1b..2314933bab 100644
--- a/proto-google-cloud-bigtable-v2/pom.xml
+++ b/proto-google-cloud-bigtable-v2/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
proto-google-cloud-bigtable-v2
- 2.52.0
+ 2.53.0
proto-google-cloud-bigtable-v2
PROTO library for proto-google-cloud-bigtable-v2
com.google.cloud
google-cloud-bigtable-parent
- 2.52.0
+ 2.53.0
@@ -18,14 +18,14 @@
com.google.cloud
google-cloud-bigtable-deps-bom
- 2.52.0
+ 2.53.0
pom
import
com.google.cloud
google-cloud-bigtable-bom
- 2.52.0
+ 2.53.0
pom
import
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index 9feaf0ea25..1b27e917fd 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -28,7 +28,7 @@
com.google.cloud
google-cloud-bigtable
- 2.52.0
+ 2.53.0
diff --git a/test-proxy/pom.xml b/test-proxy/pom.xml
index 2ea28b5ed2..248c217d9f 100644
--- a/test-proxy/pom.xml
+++ b/test-proxy/pom.xml
@@ -12,11 +12,11 @@
google-cloud-bigtable-parent
com.google.cloud
- 2.52.0
+ 2.53.0
- 2.52.0
+ 2.53.0
diff --git a/versions.txt b/versions.txt
index 03ec37199f..547c9efdaa 100644
--- a/versions.txt
+++ b/versions.txt
@@ -1,10 +1,10 @@
# Format:
# module:released-version:current-version
-google-cloud-bigtable:2.52.0:2.52.0
-grpc-google-cloud-bigtable-admin-v2:2.52.0:2.52.0
-grpc-google-cloud-bigtable-v2:2.52.0:2.52.0
-proto-google-cloud-bigtable-admin-v2:2.52.0:2.52.0
-proto-google-cloud-bigtable-v2:2.52.0:2.52.0
-google-cloud-bigtable-emulator:0.189.0:0.189.0
-google-cloud-bigtable-emulator-core:0.189.0:0.189.0
+google-cloud-bigtable:2.53.0:2.53.0
+grpc-google-cloud-bigtable-admin-v2:2.53.0:2.53.0
+grpc-google-cloud-bigtable-v2:2.53.0:2.53.0
+proto-google-cloud-bigtable-admin-v2:2.53.0:2.53.0
+proto-google-cloud-bigtable-v2:2.53.0:2.53.0
+google-cloud-bigtable-emulator:0.190.0:0.190.0
+google-cloud-bigtable-emulator-core:0.190.0:0.190.0