Skip to content

Commit c4ae6ad

Browse files
fix: Prevent sending full table scan when retrying (backport #554) (#697)
Update the retry logic. Don't send empty row_key and empty row_ranges if the original message didn't ask for those. Closes internal issue 214449800 * Create InvalidRetryRequest exception. Raise InvalidRetryRequest instead of StopIteration Catch the InvalidRetryRequest Handle stop the retry request if row_limit has been reached.
1 parent 14415e9 commit c4ae6ad

File tree

2 files changed

+30
-16
lines changed

2 files changed

+30
-16
lines changed

google/cloud/bigtable/row_data.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,10 @@ class InvalidChunk(RuntimeError):
329329
"""Exception raised to to invalid chunk data from back-end."""
330330

331331

332+
class InvalidRetryRequest(RuntimeError):
333+
"""Exception raised when retry request is invalid."""
334+
335+
332336
def _retry_read_rows_exception(exc):
333337
if isinstance(exc, grpc.RpcError):
334338
exc = exceptions.from_grpc_error(exc)
@@ -487,6 +491,9 @@ def __iter__(self):
487491
if self.state != self.NEW_ROW:
488492
raise ValueError("The row remains partial / is not committed.")
489493
break
494+
except InvalidRetryRequest:
495+
self._cancelled = True
496+
break
490497

491498
for chunk in response.chunks:
492499
if self._cancelled:
@@ -625,29 +632,38 @@ def __init__(self, message, last_scanned_key, rows_read_so_far):
625632

626633
def build_updated_request(self):
627634
"""Updates the given message request as per last scanned key"""
628-
r_kwargs = {
629-
"table_name": self.message.table_name,
630-
"filter": self.message.filter,
631-
}
635+
636+
resume_request = data_messages_v2_pb2.ReadRowsRequest()
637+
data_messages_v2_pb2.ReadRowsRequest.CopyFrom(resume_request, self.message)
638+
resume_request.rows.Clear()
632639

633640
if self.message.rows_limit != 0:
634-
r_kwargs["rows_limit"] = max(
635-
1, self.message.rows_limit - self.rows_read_so_far
636-
)
641+
row_limit_remaining = self.message.rows_limit - self.rows_read_so_far
642+
if row_limit_remaining > 0:
643+
resume_request.rows_limit = row_limit_remaining
644+
else:
645+
raise InvalidRetryRequest
637646

638647
# if neither RowSet.row_keys nor RowSet.row_ranges currently exist,
639648
# add row_range that starts with last_scanned_key as start_key_open
640649
# to request only rows that have not been returned yet
641650
if not self.message.HasField("rows"):
642651
row_range = data_v2_pb2.RowRange(start_key_open=self.last_scanned_key)
643-
r_kwargs["rows"] = data_v2_pb2.RowSet(row_ranges=[row_range])
652+
resume_request.rows.row_ranges.add().CopyFrom(row_range)
644653
else:
645654
row_keys = self._filter_rows_keys()
646655
row_ranges = self._filter_row_ranges()
647-
r_kwargs["rows"] = data_v2_pb2.RowSet(
648-
row_keys=row_keys, row_ranges=row_ranges
649-
)
650-
return data_messages_v2_pb2.ReadRowsRequest(**r_kwargs)
656+
657+
if len(row_keys) == 0 and len(row_ranges) == 0:
658+
# Avoid sending empty row_keys and row_ranges
659+
# if that was not the intention
660+
raise InvalidRetryRequest
661+
662+
resume_request.rows.row_keys[:] = row_keys
663+
for rr in row_ranges:
664+
resume_request.rows.row_ranges.add().CopyFrom(rr)
665+
666+
return resume_request
651667

652668
def _filter_rows_keys(self):
653669
"""Helper for :meth:`build_updated_request`"""

tests/unit/test_row_data.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -855,7 +855,7 @@ def test_build_updated_request_full_table(self):
855855
request_manager = self._make_one(request, last_scanned_key, 2)
856856

857857
result = request_manager.build_updated_request()
858-
expected_result = _ReadRowsRequestPB(table_name=self.table_name, filter={})
858+
expected_result = _ReadRowsRequestPB(table_name=self.table_name)
859859
expected_result.rows.row_ranges.add(start_key_open=last_scanned_key)
860860
self.assertEqual(expected_result, result)
861861

@@ -940,9 +940,7 @@ def test_build_updated_request_rows_limit(self):
940940
request_manager = self._make_one(request, last_scanned_key, 2)
941941

942942
result = request_manager.build_updated_request()
943-
expected_result = _ReadRowsRequestPB(
944-
table_name=self.table_name, filter={}, rows_limit=8
945-
)
943+
expected_result = _ReadRowsRequestPB(table_name=self.table_name, rows_limit=8)
946944
expected_result.rows.row_ranges.add(start_key_open=last_scanned_key)
947945
self.assertEqual(expected_result, result)
948946

0 commit comments

Comments
 (0)