15
15
16
16
from __future__ import annotations
17
17
18
- from typing import Sequence
18
+ from typing import (
19
+ TYPE_CHECKING ,
20
+ AsyncGenerator ,
21
+ AsyncIterable ,
22
+ Awaitable ,
23
+ Sequence ,
24
+ )
19
25
20
26
from google .cloud .bigtable_v2 .types import ReadRowsRequest as ReadRowsRequestPB
21
27
from google .cloud .bigtable_v2 .types import ReadRowsResponse as ReadRowsResponsePB
26
32
from google .cloud .bigtable .data .read_rows_query import ReadRowsQuery
27
33
from google .cloud .bigtable .data .exceptions import InvalidChunk
28
34
from google .cloud .bigtable .data .exceptions import _RowSetComplete
29
- from google .cloud .bigtable .data .exceptions import _ResetRow
30
35
from google .cloud .bigtable .data ._helpers import _attempt_timeout_generator
31
36
from google .cloud .bigtable .data ._helpers import _make_metadata
32
37
from google .cloud .bigtable .data ._helpers import _retry_exception_factory
33
38
34
39
from google .api_core import retry as retries
35
40
from google .api_core .retry import exponential_sleep_generator
36
41
37
- from google .cloud .bigtable .data ._sync .cross_sync import CrossSync
42
+ if TYPE_CHECKING :
43
+ from google .cloud .bigtable .data ._async .client import TableAsync
44
+
45
+
46
+ class _ResetRow (Exception ):
47
+ def __init__ (self , chunk ):
48
+ self .chunk = chunk
38
49
39
50
40
- @CrossSync .export_sync (
41
- path = "google.cloud.bigtable.data._sync._read_rows._ReadRowsOperation" ,
42
- add_mapping_for_name = "_ReadRowsOperation" ,
43
- )
44
51
class _ReadRowsOperationAsync :
45
52
"""
46
53
ReadRowsOperation handles the logic of merging chunks from a ReadRowsResponse stream
@@ -75,7 +82,7 @@ class _ReadRowsOperationAsync:
75
82
def __init__ (
76
83
self ,
77
84
query : ReadRowsQuery ,
78
- table : "CrossSync.Table " ,
85
+ table : "TableAsync " ,
79
86
operation_timeout : float ,
80
87
attempt_timeout : float ,
81
88
retryable_exceptions : Sequence [type [Exception ]] = (),
@@ -101,22 +108,22 @@ def __init__(
101
108
self ._last_yielded_row_key : bytes | None = None
102
109
self ._remaining_count : int | None = self .request .rows_limit or None
103
110
104
- def start_operation (self ) -> CrossSync . Iterable [Row ]:
111
+ def start_operation (self ) -> AsyncGenerator [Row , None ]:
105
112
"""
106
113
Start the read_rows operation, retrying on retryable errors.
107
114
108
115
Yields:
109
116
Row: The next row in the stream
110
117
"""
111
- return CrossSync . retry_target_stream (
118
+ return retries . retry_target_stream_async (
112
119
self ._read_rows_attempt ,
113
120
self ._predicate ,
114
121
exponential_sleep_generator (0.01 , 60 , multiplier = 2 ),
115
122
self .operation_timeout ,
116
123
exception_factory = _retry_exception_factory ,
117
124
)
118
125
119
- def _read_rows_attempt (self ) -> CrossSync . Iterable [Row ]:
126
+ def _read_rows_attempt (self ) -> AsyncGenerator [Row , None ]:
120
127
"""
121
128
Attempt a single read_rows rpc call.
122
129
This function is intended to be wrapped by retry logic,
@@ -152,10 +159,9 @@ def _read_rows_attempt(self) -> CrossSync.Iterable[Row]:
152
159
chunked_stream = self .chunk_stream (gapic_stream )
153
160
return self .merge_rows (chunked_stream )
154
161
155
- @CrossSync .convert
156
162
async def chunk_stream (
157
- self , stream : CrossSync . Awaitable [CrossSync . Iterable [ReadRowsResponsePB ]]
158
- ) -> CrossSync . Iterable [ReadRowsResponsePB .CellChunk ]:
163
+ self , stream : Awaitable [AsyncIterable [ReadRowsResponsePB ]]
164
+ ) -> AsyncGenerator [ReadRowsResponsePB .CellChunk , None ]:
159
165
"""
160
166
process chunks out of raw read_rows stream
161
167
@@ -164,7 +170,7 @@ async def chunk_stream(
164
170
Yields:
165
171
ReadRowsResponsePB.CellChunk: the next chunk in the stream
166
172
"""
167
- async for resp in CrossSync . rm_aio ( await stream ) :
173
+ async for resp in await stream :
168
174
# extract proto from proto-plus wrapper
169
175
resp = resp ._pb
170
176
@@ -205,12 +211,9 @@ async def chunk_stream(
205
211
current_key = None
206
212
207
213
@staticmethod
208
- @CrossSync .convert (
209
- replace_symbols = {"__aiter__" : "__iter__" , "__anext__" : "__next__" }
210
- )
211
214
async def merge_rows (
212
- chunks : CrossSync . Iterable [ReadRowsResponsePB .CellChunk ] | None ,
213
- ) -> CrossSync . Iterable [Row ]:
215
+ chunks : AsyncGenerator [ReadRowsResponsePB .CellChunk , None ] | None
216
+ ) -> AsyncGenerator [Row , None ]:
214
217
"""
215
218
Merge chunks into rows
216
219
@@ -225,8 +228,8 @@ async def merge_rows(
225
228
# For each row
226
229
while True :
227
230
try :
228
- c = CrossSync . rm_aio ( await it .__anext__ () )
229
- except CrossSync . StopIteration :
231
+ c = await it .__anext__ ()
232
+ except StopAsyncIteration :
230
233
# stream complete
231
234
return
232
235
row_key = c .row_key
@@ -274,7 +277,7 @@ async def merge_rows(
274
277
buffer = [value ]
275
278
while c .value_size > 0 :
276
279
# throws when premature end
277
- c = CrossSync . rm_aio ( await it .__anext__ () )
280
+ c = await it .__anext__ ()
278
281
279
282
t = c .timestamp_micros
280
283
cl = c .labels
@@ -306,7 +309,7 @@ async def merge_rows(
306
309
if c .commit_row :
307
310
yield Row (row_key , cells )
308
311
break
309
- c = CrossSync . rm_aio ( await it .__anext__ () )
312
+ c = await it .__anext__ ()
310
313
except _ResetRow as e :
311
314
c = e .chunk
312
315
if (
@@ -319,7 +322,7 @@ async def merge_rows(
319
322
):
320
323
raise InvalidChunk ("reset row with data" )
321
324
continue
322
- except CrossSync . StopIteration :
325
+ except StopAsyncIteration :
323
326
raise InvalidChunk ("premature end of stream" )
324
327
325
328
@staticmethod
0 commit comments