Skip to content

Commit 3e5df35

Browse files
authored
feat: use generator for stream results (#926)
* feat: use iterator for query results * use iterator as suggested in comments * remove unnecessary class * more iterators * undo adding await * address comments * undo bundle change * undo bundle change * cleanups and docstrings * fix type hint * unit tests * lint * skip tests with anext for python < 3.10 * lint * address comments * lint * fix type hint * type hint * sys test debug * sys test debug * undo change for debug * address comment * system test debug * undo system test debug code
1 parent 104293b commit 3e5df35

16 files changed

+703
-175
lines changed

google/cloud/firestore_v1/aggregation.py

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,14 @@
3030
BaseAggregationQuery,
3131
_query_response_to_result,
3232
)
33+
from google.cloud.firestore_v1.base_document import DocumentSnapshot
34+
from google.cloud.firestore_v1.stream_generator import StreamGenerator
3335

34-
from typing import Generator, Union, List, Any
36+
from typing import Any, Generator, List, Optional, TYPE_CHECKING, Union
37+
38+
# Types needed only for Type Hints
39+
if TYPE_CHECKING:
40+
from google.cloud.firestore_v1 import transaction # pragma: NO COVER
3541

3642

3743
class AggregationQuery(BaseAggregationQuery):
@@ -99,36 +105,34 @@ def _retry_query_after_exception(self, exc, retry, transaction):
99105

100106
return False
101107

102-
def stream(
108+
def _make_stream(
103109
self,
104-
transaction=None,
105-
retry: Union[
106-
retries.Retry, None, gapic_v1.method._MethodDefault
107-
] = gapic_v1.method.DEFAULT,
108-
timeout: float | None = None,
110+
transaction: Optional[transaction.Transaction] = None,
111+
retry: Optional[retries.Retry] = gapic_v1.method.DEFAULT,
112+
timeout: Optional[float] = None,
109113
) -> Union[Generator[List[AggregationResult], Any, None]]:
110-
"""Runs the aggregation query.
114+
"""Internal method for stream(). Runs the aggregation query.
111115
112-
This sends a ``RunAggregationQuery`` RPC and then returns an iterator which
113-
consumes each document returned in the stream of ``RunAggregationQueryResponse``
114-
messages.
116+
This sends a ``RunAggregationQuery`` RPC and then returns a generator
117+
which consumes each document returned in the stream of
118+
``RunAggregationQueryResponse`` messages.
115119
116-
If a ``transaction`` is used and it already has write operations
117-
added, this method cannot be used (i.e. read-after-write is not
118-
allowed).
120+
If a ``transaction`` is used and it already has write operations added,
121+
this method cannot be used (i.e. read-after-write is not allowed).
119122
120123
Args:
121124
transaction
122125
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
123126
An existing transaction that this query will run in.
124-
retry (google.api_core.retry.Retry): Designation of what errors, if any,
125-
should be retried. Defaults to a system-specified policy.
126-
timeout (float): The timeout for this request. Defaults to a
127-
system-specified value.
127+
retry (Optional[google.api_core.retry.Retry]): Designation of what
128+
errors, if any, should be retried. Defaults to a
129+
system-specified policy.
130+
timeout (Optional[float]): The timeout for this request. Defaults
131+
to a system-specified value.
128132
129133
Yields:
130134
:class:`~google.cloud.firestore_v1.base_aggregation.AggregationResult`:
131-
The result of aggregations of this query
135+
The result of aggregations of this query.
132136
"""
133137

134138
response_iterator = self._get_stream_iterator(
@@ -154,3 +158,38 @@ def stream(
154158
break
155159
result = _query_response_to_result(response)
156160
yield result
161+
162+
def stream(
163+
self,
164+
transaction: Optional["transaction.Transaction"] = None,
165+
retry: Optional[retries.Retry] = gapic_v1.method.DEFAULT,
166+
timeout: Optional[float] = None,
167+
) -> "StreamGenerator[DocumentSnapshot]":
168+
"""Runs the aggregation query.
169+
170+
This sends a ``RunAggregationQuery`` RPC and then returns a generator
171+
which consumes each document returned in the stream of
172+
``RunAggregationQueryResponse`` messages.
173+
174+
If a ``transaction`` is used and it already has write operations added,
175+
this method cannot be used (i.e. read-after-write is not allowed).
176+
177+
Args:
178+
transaction
179+
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
180+
An existing transaction that this query will run in.
181+
retry (Optional[google.api_core.retry.Retry]): Designation of what
182+
errors, if any, should be retried. Defaults to a
183+
system-specified policy.
184+
timeout (Optinal[float]): The timeout for this request. Defaults
185+
to a system-specified value.
186+
187+
Returns:
188+
`StreamGenerator[DocumentSnapshot]`: A generator of the query results.
189+
"""
190+
inner_generator = self._make_stream(
191+
transaction=transaction,
192+
retry=retry,
193+
timeout=timeout,
194+
)
195+
return StreamGenerator(inner_generator)

google/cloud/firestore_v1/async_aggregation.py

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,18 @@
2323
from google.api_core import gapic_v1
2424
from google.api_core import retry_async as retries
2525

26-
from typing import List, Union, AsyncGenerator
27-
26+
from typing import AsyncGenerator, List, Optional, Union, TYPE_CHECKING
2827

28+
from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator
2929
from google.cloud.firestore_v1.base_aggregation import (
3030
AggregationResult,
3131
_query_response_to_result,
3232
BaseAggregationQuery,
3333
)
34+
from google.cloud.firestore_v1 import transaction
35+
36+
if TYPE_CHECKING: # pragma: NO COVER
37+
from google.cloud.firestore_v1.base_document import DocumentSnapshot
3438

3539

3640
class AsyncAggregationQuery(BaseAggregationQuery):
@@ -76,17 +80,15 @@ async def get(
7680
result = [aggregation async for aggregation in stream_result]
7781
return result # type: ignore
7882

79-
async def stream(
83+
async def _make_stream(
8084
self,
81-
transaction=None,
82-
retry: Union[
83-
retries.AsyncRetry, None, gapic_v1.method._MethodDefault
84-
] = gapic_v1.method.DEFAULT,
85-
timeout: float | None = None,
85+
transaction: Optional[transaction.Transaction] = None,
86+
retry: Optional[retries.AsyncRetry] = gapic_v1.method.DEFAULT,
87+
timeout: Optional[float] = None,
8688
) -> Union[AsyncGenerator[List[AggregationResult], None]]:
87-
"""Runs the aggregation query.
89+
"""Internal method for stream(). Runs the aggregation query.
8890
89-
This sends a ``RunAggregationQuery`` RPC and then returns an iterator which
91+
This sends a ``RunAggregationQuery`` RPC and then returns a generator which
9092
consumes each document returned in the stream of ``RunAggregationQueryResponse``
9193
messages.
9294
@@ -95,13 +97,14 @@ async def stream(
9597
allowed).
9698
9799
Args:
98-
transaction
99-
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
100-
An existing transaction that this query will run in.
101-
retry (google.api_core.retry.Retry): Designation of what errors, if any,
102-
should be retried. Defaults to a system-specified policy.
103-
timeout (float): The timeout for this request. Defaults to a
104-
system-specified value.
100+
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\
101+
Transaction`]):
102+
An existing transaction that the query will run in.
103+
retry (Optional[google.api_core.retry.Retry]): Designation of what
104+
errors, if any, should be retried. Defaults to a
105+
system-specified policy.
106+
timeout (Optional[float]): The timeout for this request. Defaults
107+
to a system-specified value.
105108
106109
Yields:
107110
:class:`~google.cloud.firestore_v1.base_aggregation.AggregationResult`:
@@ -122,3 +125,40 @@ async def stream(
122125
async for response in response_iterator:
123126
result = _query_response_to_result(response)
124127
yield result
128+
129+
def stream(
130+
self,
131+
transaction: Optional[transaction.Transaction] = None,
132+
retry: Optional[retries.AsyncRetry] = gapic_v1.method.DEFAULT,
133+
timeout: Optional[float] = None,
134+
) -> "AsyncStreamGenerator[DocumentSnapshot]":
135+
"""Runs the aggregation query.
136+
137+
This sends a ``RunAggregationQuery`` RPC and then returns a generator
138+
which consumes each document returned in the stream of
139+
``RunAggregationQueryResponse`` messages.
140+
141+
If a ``transaction`` is used and it already has write operations added,
142+
this method cannot be used (i.e. read-after-write is not allowed).
143+
144+
Args:
145+
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\
146+
Transaction`]):
147+
An existing transaction that the query will run in.
148+
retry (Optional[google.api_core.retry.Retry]): Designation of what
149+
errors, if any, should be retried. Defaults to a
150+
system-specified policy.
151+
timeout (Optional[float]): The timeout for this request. Defaults
152+
to a system-specified value.
153+
154+
Returns:
155+
`AsyncStreamGenerator[DocumentSnapshot]`:
156+
A generator of the query results.
157+
"""
158+
159+
inner_generator = self._make_stream(
160+
transaction=transaction,
161+
retry=retry,
162+
timeout=timeout,
163+
)
164+
return AsyncStreamGenerator(inner_generator)

google/cloud/firestore_v1/async_collection.py

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,20 @@
2121
BaseCollectionReference,
2222
_item_to_document_ref,
2323
)
24-
from google.cloud.firestore_v1 import async_query, async_document, async_aggregation
24+
from google.cloud.firestore_v1 import (
25+
async_aggregation,
26+
async_document,
27+
async_query,
28+
transaction,
29+
)
2530

2631
from google.cloud.firestore_v1.document import DocumentReference
2732

28-
from typing import AsyncIterator
29-
from typing import Any, AsyncGenerator, Tuple
33+
from typing import Any, AsyncGenerator, Optional, Tuple, TYPE_CHECKING
3034

31-
# Types needed only for Type Hints
32-
from google.cloud.firestore_v1.transaction import Transaction
35+
if TYPE_CHECKING: # pragma: NO COVER
36+
from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator
37+
from google.cloud.firestore_v1.base_document import DocumentSnapshot
3338

3439

3540
class AsyncCollectionReference(BaseCollectionReference[async_query.AsyncQuery]):
@@ -176,9 +181,9 @@ async def list_documents(
176181

177182
async def get(
178183
self,
179-
transaction: Transaction = None,
180-
retry: retries.AsyncRetry = gapic_v1.method.DEFAULT,
181-
timeout: float = None,
184+
transaction: Optional[transaction.Transaction] = None,
185+
retry: Optional[retries.AsyncRetry] = gapic_v1.method.DEFAULT,
186+
timeout: Optional[float] = None,
182187
) -> list:
183188
"""Read the documents in this collection.
184189
@@ -189,14 +194,14 @@ async def get(
189194
transaction
190195
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
191196
An existing transaction that this query will run in.
192-
retry (google.api_core.retry.Retry): Designation of what errors, if any,
193-
should be retried. Defaults to a system-specified policy.
194-
timeout (float): The timeout for this request. Defaults to a
195-
system-specified value.
197+
retry (Optional[google.api_core.retry.Retry]): Designation of what
198+
errors, if any, should be retried. Defaults to a
199+
system-specified policy.
200+
timeout (Otional[float]): The timeout for this request. Defaults
201+
to a system-specified value.
196202
197-
If a ``transaction`` is used and it already has write operations
198-
added, this method cannot be used (i.e. read-after-write is not
199-
allowed).
203+
If a ``transaction`` is used and it already has write operations added,
204+
this method cannot be used (i.e. read-after-write is not allowed).
200205
201206
Returns:
202207
list: The documents in this collection that match the query.
@@ -205,15 +210,15 @@ async def get(
205210

206211
return await query.get(transaction=transaction, **kwargs)
207212

208-
async def stream(
213+
def stream(
209214
self,
210-
transaction: Transaction = None,
211-
retry: retries.AsyncRetry = gapic_v1.method.DEFAULT,
212-
timeout: float = None,
213-
) -> AsyncIterator[async_document.DocumentSnapshot]:
215+
transaction: Optional[transaction.Transaction] = None,
216+
retry: Optional[retries.AsyncRetry] = gapic_v1.method.DEFAULT,
217+
timeout: Optional[float] = None,
218+
) -> "AsyncStreamGenerator[DocumentSnapshot]":
214219
"""Read the documents in this collection.
215220
216-
This sends a ``RunQuery`` RPC and then returns an iterator which
221+
This sends a ``RunQuery`` RPC and then returns a generator which
217222
consumes each document returned in the stream of ``RunQueryResponse``
218223
messages.
219224
@@ -232,16 +237,16 @@ async def stream(
232237
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\
233238
Transaction`]):
234239
An existing transaction that the query will run in.
235-
retry (google.api_core.retry.Retry): Designation of what errors, if any,
236-
should be retried. Defaults to a system-specified policy.
237-
timeout (float): The timeout for this request. Defaults to a
238-
system-specified value.
240+
retry (Optional[google.api_core.retry.Retry]): Designation of what
241+
errors, if any, should be retried. Defaults to a
242+
system-specified policy.
243+
timeout (Optional[float]): The timeout for this request. Defaults
244+
to a system-specified value.
239245
240-
Yields:
241-
:class:`~google.cloud.firestore_v1.document.DocumentSnapshot`:
242-
The next document that fulfills the query.
246+
Returns:
247+
`AsyncStreamGenerator[DocumentSnapshot]`: A generator of the query
248+
results.
243249
"""
244250
query, kwargs = self._prep_get_or_stream(retry, timeout)
245251

246-
async for d in query.stream(transaction=transaction, **kwargs):
247-
yield d # pytype: disable=name-error
252+
return query.stream(transaction=transaction, **kwargs)

0 commit comments

Comments
 (0)