diff --git a/CHANGELOG.md b/CHANGELOG.md index 19b1bc9a0..3cf128b06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,16 @@ [1]: https://pypi.org/project/google-cloud-bigtable/#history +## [1.7.3](https://github.com/googleapis/python-bigtable/compare/v1.7.2...v1.7.3) (2022-11-18) + + +### Bug Fixes + +* First pass on making retry configuration more consistent ([#695](https://github.com/googleapis/python-bigtable/issues/695)) ([c707c30](https://github.com/googleapis/python-bigtable/commit/c707c309a4095e14ecc678f327d417d5a2a55243)) +* Make internal rst_stream errors retriable ([#699](https://github.com/googleapis/python-bigtable/issues/699)) ([770feb8](https://github.com/googleapis/python-bigtable/commit/770feb840c5bc30ebf30447e9460d581c1a5c5b0)) +* Make sure that the proper exception type is bubbled up for ReadRows ([#696](https://github.com/googleapis/python-bigtable/issues/696)) ([5c72780](https://github.com/googleapis/python-bigtable/commit/5c727802ea55611a103bca1d4fff2a3d305be758)) +* Prevent sending full table scan when retrying (backport [#554](https://github.com/googleapis/python-bigtable/issues/554)) ([#697](https://github.com/googleapis/python-bigtable/issues/697)) ([c4ae6ad](https://github.com/googleapis/python-bigtable/commit/c4ae6ad5d2ce8e495250b1df41a4ba1877dacca2)) + ## [1.7.2](https://github.com/googleapis/python-bigtable/compare/v1.7.1...v1.7.2) (2022-06-07) diff --git a/google/cloud/bigtable/row_data.py b/google/cloud/bigtable/row_data.py index c37ef7723..0a0bd96e4 100644 --- a/google/cloud/bigtable/row_data.py +++ b/google/cloud/bigtable/row_data.py @@ -16,6 +16,9 @@ import copy +import time + +import google.api_core.exceptions import six import grpc @@ -329,9 +332,11 @@ class InvalidChunk(RuntimeError): """Exception raised to to invalid chunk data from back-end.""" +class InvalidRetryRequest(RuntimeError): + """Exception raised when retry request is invalid.""" + + def _retry_read_rows_exception(exc): - if isinstance(exc, grpc.RpcError): - exc = exceptions.from_grpc_error(exc) return isinstance(exc, (exceptions.ServiceUnavailable, exceptions.DeadlineExceeded)) @@ -340,6 +345,10 @@ def _retry_read_rows_exception(exc): initial=1.0, maximum=15.0, multiplier=2.0, + # NOTE: this is a soft read timeout: this limits for how long we are willing + # to schedule retry attempts to read the next row. This does not set the + # RPC timeout. Please use the separate overal_timeout parameter of read_rows + # to limit the operation duration deadline=60.0, # 60 seconds ) """The default retry strategy to be used on retry-able errors. @@ -387,7 +396,14 @@ class PartialRowsData(object): STATE_CELL_IN_PROGRESS: CELL_IN_PROGRESS, } - def __init__(self, read_method, request, retry=DEFAULT_RETRY_READ_ROWS): + def __init__( + self, + read_method, + request, + retry=DEFAULT_RETRY_READ_ROWS, + attempt_timeout=None, + overall_timeout=None, + ): # Counter for rows returned to the user self._counter = 0 # In-progress row, unset until first response, after commit/reset @@ -404,14 +420,14 @@ def __init__(self, read_method, request, retry=DEFAULT_RETRY_READ_ROWS): self.read_method = read_method self.request = request self.retry = retry + self._attempt_timeout = attempt_timeout + # absolute timestamp when all retry attempts should end + if overall_timeout: + self._overall_deadline = time.time() + overall_timeout + else: + self._overall_deadline = None - # The `timeout` parameter must be somewhat greater than the value - # contained in `self.retry`, in order to avoid race-like condition and - # allow registering the first deadline error before invoking the retry. - # Otherwise there is a risk of entering an infinite loop that resets - # the timeout counter just before it being triggered. The increment - # by 1 second here is customary but should not be much less than that. - self.response_iterator = read_method(request, timeout=self.retry._deadline + 1) + self.response_iterator = self._create_read_stream(request) self.rows = {} self._state = self.STATE_NEW_ROW @@ -449,6 +465,28 @@ class as a generator instead. for row in self: self.rows[row.row_key] = row + @property + def remaining_overall_timeout(self): + """Returns the remaining deadline allotted for the entire stream. + Returns a float of seconds""" + if not self._overall_deadline: + return None + + return self._overall_deadline - time.time() + + def _create_read_stream(self, req): + """Starts a new RPC bounded by the overall deadline and attempt timeout. + + :type req: class:`data_messages_v2_pb2.ReadRowsRequest` + """ + effective_timeout = self.remaining_overall_timeout + if effective_timeout is None: + effective_timeout = self._attempt_timeout + elif self._attempt_timeout is not None: + effective_timeout = min(effective_timeout, self._attempt_timeout) + + return self.read_method(req, timeout=effective_timeout) + def _create_retry_request(self): """Helper for :meth:`__iter__`.""" req_manager = _ReadRowsRequestManager( @@ -463,14 +501,39 @@ def _on_error(self, exc): if self.last_scanned_row_key: retry_request = self._create_retry_request() - self.response_iterator = self.read_method(retry_request) + self.response_iterator = self._create_read_stream(retry_request) def _read_next(self): """Helper for :meth:`__iter__`.""" - return six.next(self.response_iterator) + try: + return six.next(self.response_iterator) + except grpc.RpcError as grpc_error: + # TODO: this needs to be moved to a more general location (ie interceptor) + e = exceptions.from_grpc_error(grpc_error) + # Sometimes GOAWAYs are surfaced as INTERNAL errors, which makes + # them unretriable. This patches that behavior + if e.grpc_status_code == grpc.StatusCode.INTERNAL and ( + "rst_stream" in e.message.lower() or "rst stream" in e.message.lower() + ): + raise google.api_core.exceptions.ServiceUnavailable(e.message) + raise e def _read_next_response(self): """Helper for :meth:`__iter__`.""" + # Calculate the maximum amount of time that retries should be scheduled. + # This will not actually set any deadlines, it will only limit the + # duration of time that we are willing to schedule retries for. + remaining_overall_timeout = self.remaining_overall_timeout + + if remaining_overall_timeout is not None: + # we want make sure that the retry logic doesnt retry after the + # operation deadline is past + if ( + self.retry.deadline is None + or self.retry.deadline > remaining_overall_timeout + ): + self.retry = self.retry.with_deadline(remaining_overall_timeout) + return self.retry(self._read_next, on_error=self._on_error)() def __iter__(self): @@ -487,6 +550,9 @@ def __iter__(self): if self.state != self.NEW_ROW: raise ValueError("The row remains partial / is not committed.") break + except InvalidRetryRequest: + self._cancelled = True + break for chunk in response.chunks: if self._cancelled: @@ -625,29 +691,38 @@ def __init__(self, message, last_scanned_key, rows_read_so_far): def build_updated_request(self): """Updates the given message request as per last scanned key""" - r_kwargs = { - "table_name": self.message.table_name, - "filter": self.message.filter, - } + + resume_request = data_messages_v2_pb2.ReadRowsRequest() + data_messages_v2_pb2.ReadRowsRequest.CopyFrom(resume_request, self.message) + resume_request.rows.Clear() if self.message.rows_limit != 0: - r_kwargs["rows_limit"] = max( - 1, self.message.rows_limit - self.rows_read_so_far - ) + row_limit_remaining = self.message.rows_limit - self.rows_read_so_far + if row_limit_remaining > 0: + resume_request.rows_limit = row_limit_remaining + else: + raise InvalidRetryRequest # if neither RowSet.row_keys nor RowSet.row_ranges currently exist, # add row_range that starts with last_scanned_key as start_key_open # to request only rows that have not been returned yet if not self.message.HasField("rows"): row_range = data_v2_pb2.RowRange(start_key_open=self.last_scanned_key) - r_kwargs["rows"] = data_v2_pb2.RowSet(row_ranges=[row_range]) + resume_request.rows.row_ranges.add().CopyFrom(row_range) else: row_keys = self._filter_rows_keys() row_ranges = self._filter_row_ranges() - r_kwargs["rows"] = data_v2_pb2.RowSet( - row_keys=row_keys, row_ranges=row_ranges - ) - return data_messages_v2_pb2.ReadRowsRequest(**r_kwargs) + + if len(row_keys) == 0 and len(row_ranges) == 0: + # Avoid sending empty row_keys and row_ranges + # if that was not the intention + raise InvalidRetryRequest + + resume_request.rows.row_keys[:] = row_keys + for rr in row_ranges: + resume_request.rows.row_ranges.add().CopyFrom(rr) + + return resume_request def _filter_rows_keys(self): """Helper for :meth:`build_updated_request`""" diff --git a/google/cloud/bigtable/table.py b/google/cloud/bigtable/table.py index 887b74b02..45481f350 100644 --- a/google/cloud/bigtable/table.py +++ b/google/cloud/bigtable/table.py @@ -481,7 +481,7 @@ def get_cluster_states(self): for cluster_id, value_pb in table_pb.cluster_states.items() } - def read_row(self, row_key, filter_=None): + def read_row(self, row_key, filter_=None, overall_timeout=60): """Read a single row from this table. For example: @@ -506,7 +506,11 @@ def read_row(self, row_key, filter_=None): """ row_set = RowSet() row_set.add_row_key(row_key) - result_iter = iter(self.read_rows(filter_=filter_, row_set=row_set)) + result_iter = iter( + self.read_rows( + filter_=filter_, row_set=row_set, overall_timeout=overall_timeout + ) + ) row = next(result_iter, None) if next(result_iter, None) is not None: raise ValueError("More than one row was returned.") @@ -521,6 +525,8 @@ def read_rows( end_inclusive=False, row_set=None, retry=DEFAULT_RETRY_READ_ROWS, + attempt_timeout=None, + overall_timeout=None, ): """Read rows from this table. @@ -565,7 +571,22 @@ def read_rows( default value :attr:`DEFAULT_RETRY_READ_ROWS` can be used and modified with the :meth:`~google.api_core.retry.Retry.with_delay` method or the :meth:`~google.api_core.retry.Retry.with_deadline` - method. + method. This retry object is used to try to fetch the next row: + this means that the deadline specified by this object is reset + after every row read. Furthermore, this deadline is loosely enforced: + it will only prevent additional attempts from be scheduled after the + deadline, it will not limit how long a single attempt to read the + next row will run. Prefer to use overall_timeout below. + + + :type attempt_timeout: float + :param attempt_timeout: (Optional) the attempt timeout to execute a + single RPC. If this attempt fails and there is overall_timeout + left, another attempt will be sent. + + :type overall_timeout: float + :param overall_timeout: (Optional) the overall operation deadline to + to completely read the entire ReadRows stream. :rtype: :class:`.PartialRowsData` :returns: A :class:`.PartialRowsData` a generator for consuming @@ -582,7 +603,13 @@ def read_rows( row_set=row_set, ) data_client = self._instance._client.table_data_client - return PartialRowsData(data_client.transport.read_rows, request_pb, retry) + return PartialRowsData( + data_client.transport.read_rows, + request_pb, + retry, + attempt_timeout=attempt_timeout, + overall_timeout=overall_timeout, + ) def yield_rows(self, **kwargs): """Read rows from this table. @@ -615,6 +642,15 @@ def yield_rows(self, **kwargs): :param row_set: (Optional) The row set containing multiple row keys and row_ranges. + :type attempt_timeout: float + :param attempt_timeout: (Optional) the attempt timeout to execute a + single RPC. If this attempt fails and there is overall_timeout + left, another attempt will be sent. + + :type overall_timeout: float + :param overall_timeout: (Optional) the overall operation deadline to + to completely read the entire ReadRows stream. + :rtype: :class:`.PartialRowData` :returns: A :class:`.PartialRowData` for each row returned """ diff --git a/setup.py b/setup.py index 285932e12..871ec2e54 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ name = 'google-cloud-bigtable' description = 'Google Cloud Bigtable API client library' -version = "1.7.2" +version = "1.7.3" # Should be one of: # 'Development Status :: 3 - Alpha' # 'Development Status :: 4 - Beta' diff --git a/tests/unit/test_row_data.py b/tests/unit/test_row_data.py index 8b3f48b78..17ab46839 100644 --- a/tests/unit/test_row_data.py +++ b/tests/unit/test_row_data.py @@ -336,20 +336,6 @@ def test_w_miss_wrapped_in_grpc(self): exception = self._make_grpc_call_error(wrapped) self.assertFalse(self._call_fut(exception)) - def test_w_service_unavailable_wrapped_in_grpc(self): - from google.api_core.exceptions import ServiceUnavailable - - wrapped = ServiceUnavailable("testing") - exception = self._make_grpc_call_error(wrapped) - self.assertTrue(self._call_fut(exception)) - - def test_w_deadline_exceeded_wrapped_in_grpc(self): - from google.api_core.exceptions import DeadlineExceeded - - wrapped = DeadlineExceeded("testing") - exception = self._make_grpc_call_error(wrapped) - self.assertTrue(self._call_fut(exception)) - class TestPartialRowsData(unittest.TestCase): ROW_KEY = b"row-key" @@ -384,20 +370,26 @@ def test_constructor(self): self.assertEqual(partial_rows_data.rows, {}) self.assertEqual(partial_rows_data.retry, DEFAULT_RETRY_READ_ROWS) - def test_constructor_with_retry(self): - from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS - + def test_constructor_with_overall_timeout(self): client = _Client() client._data_stub = mock.MagicMock() request = object() - retry = DEFAULT_RETRY_READ_ROWS - partial_rows_data = self._make_one(client._data_stub.ReadRows, request, retry) - partial_rows_data.read_method.assert_called_once_with( - request, timeout=DEFAULT_RETRY_READ_ROWS.deadline + 1 + partial_rows_data = self._make_one( + client._data_stub.ReadRows, request, overall_timeout=11 ) + partial_rows_data.read_method.assert_called_once_with(request, timeout=mock.ANY) + + # the deadline being passed to the first RPC should be slightly less + # than 11. But to avoid flakiness on slow test runners, its padded down + # by 3 secs + self.assertLess(8, partial_rows_data.read_method.call_args.kwargs["timeout"]) + self.assertIs(partial_rows_data.request, request) self.assertEqual(partial_rows_data.rows, {}) - self.assertEqual(partial_rows_data.retry, retry) + # The remaining deadline should be + # But to avoid flakiness on slow test runners, its padded down by 3 secs + self.assertLess(8, partial_rows_data.remaining_overall_timeout) + self.assertLessEqual(partial_rows_data.remaining_overall_timeout, 11) def test___eq__(self): client = _Client() @@ -855,7 +847,7 @@ def test_build_updated_request_full_table(self): request_manager = self._make_one(request, last_scanned_key, 2) result = request_manager.build_updated_request() - expected_result = _ReadRowsRequestPB(table_name=self.table_name, filter={}) + expected_result = _ReadRowsRequestPB(table_name=self.table_name) expected_result.rows.row_ranges.add(start_key_open=last_scanned_key) self.assertEqual(expected_result, result) @@ -940,9 +932,7 @@ def test_build_updated_request_rows_limit(self): request_manager = self._make_one(request, last_scanned_key, 2) result = request_manager.build_updated_request() - expected_result = _ReadRowsRequestPB( - table_name=self.table_name, filter={}, rows_limit=8 - ) + expected_result = _ReadRowsRequestPB(table_name=self.table_name, rows_limit=8) expected_result.rows.row_ranges.add(start_key_open=last_scanned_key) self.assertEqual(expected_result, result) diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index 1793a8602..4b42fc3a6 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -11,8 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - +import time import unittest import mock @@ -798,17 +797,12 @@ def mock_create_row_request(table_name, **kwargs): def test_read_retry_rows(self): from google.cloud.bigtable_v2.gapic import bigtable_client - from google.cloud.bigtable_admin_v2.gapic import bigtable_table_admin_client from google.api_core import retry data_api = bigtable_client.BigtableClient(mock.Mock()) - table_api = bigtable_table_admin_client.BigtableTableAdminClient(mock.Mock()) credentials = _make_credentials() - client = self._make_client( - project="project-id", credentials=credentials, admin=True - ) + client = self._make_client(project="project-id", credentials=credentials) client._table_data_client = data_api - client._table_admin_client = table_api instance = client.instance(instance_id=self.INSTANCE_ID) table = self._make_one(self.TABLE_ID, instance) @@ -857,6 +851,77 @@ def test_read_retry_rows(self): result = rows[1] self.assertEqual(result.row_key, self.ROW_KEY_2) + def test_read_retry_rows_timeouts(self): + from google.cloud.bigtable_v2.gapic import bigtable_client + + data_api = bigtable_client.BigtableClient(mock.Mock()) + credentials = _make_credentials() + client = self._make_client(project="project-id", credentials=credentials) + client._table_data_client = data_api + instance = client.instance(instance_id=self.INSTANCE_ID) + table = self._make_one(self.TABLE_ID, instance) + + # Patch the stub used by the API method. + client._table_data_client.transport.read_rows = mock.Mock( + side_effect=[_MockReadRowsIterator()] + ) + + # By default there is no timeout + list(table.read_rows()) + self.assertIsNone( + client._table_data_client.transport.read_rows.call_args.kwargs["timeout"] + ) + + # attempt timeout should be passed thru + client._table_data_client.transport.read_rows = mock.Mock( + side_effect=[_MockReadRowsIterator()] + ) + list(table.read_rows(attempt_timeout=1.0)) + self.assertEquals( + 1.0, + client._table_data_client.transport.read_rows.call_args.kwargs["timeout"], + ) + + # overall timeout should be passed thru + client._table_data_client.transport.read_rows = mock.Mock( + side_effect=[_MockReadRowsIterator()] + ) + list(table.read_rows(overall_timeout=10.0)) + # The RPC timeout should be slightly less than 10.0 but to avoid test + # flakiness its padded by a couple of secs. + self.assertLess( + 8.0, + client._table_data_client.transport.read_rows.call_args.kwargs["timeout"], + ) + + # attempt timeout limits overall timeout + client._table_data_client.transport.read_rows = mock.Mock( + side_effect=[_MockReadRowsIterator()] + ) + list(table.read_rows(attempt_timeout=5.0, overall_timeout=10.0)) + self.assertLessEqual( + 5.0, + client._table_data_client.transport.read_rows.call_args.kwargs["timeout"], + ) + + # attempt timeout is truncated by overall timeout + class DelayedFailureIterator(object): + def next(self): + time.sleep(0.75) + raise DeadlineExceeded("delayed error") + + __next__ = next + + client._table_data_client.transport.read_rows = mock.Mock( + side_effect=[DelayedFailureIterator(), _MockReadRowsIterator()] + ) + list(table.read_rows(attempt_timeout=1.0, overall_timeout=1.0)) + + self.assertGreater( + 1.0, + client._table_data_client.transport.read_rows.call_args.kwargs["timeout"], + ) + def test_yield_retry_rows(self): from google.cloud.bigtable_v2.gapic import bigtable_client from google.cloud.bigtable_admin_v2.gapic import bigtable_table_admin_client