Skip to content

Commit 45bc8c4

Browse files
kboroszkomwalkiewiczgcf-owl-bot[bot]
authored
feat: async execute query client (#1011)
Co-authored-by: Mateusz Walkiewicz <[email protected]> Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent abe67c4 commit 45bc8c4

39 files changed

+3855
-78
lines changed

google/cloud/bigtable/data/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup
4040
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup
4141
from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup
42+
from google.cloud.bigtable.data.exceptions import ParameterTypeInferenceFailed
4243

4344
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT
4445
from google.cloud.bigtable.data._helpers import RowKeySamples
@@ -68,6 +69,7 @@
6869
"RetryExceptionGroup",
6970
"MutationsExceptionGroup",
7071
"ShardedReadRowsExceptionGroup",
72+
"ParameterTypeInferenceFailed",
7173
"ShardedQuery",
7274
"TABLE_DEFAULT",
7375
)

google/cloud/bigtable/data/_async/_mutate_rows.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@ def __init__(
8484
f"all entries. Found {total_mutations}."
8585
)
8686
# create partial function to pass to trigger rpc call
87-
metadata = _make_metadata(table.table_name, table.app_profile_id)
87+
metadata = _make_metadata(
88+
table.table_name, table.app_profile_id, instance_name=None
89+
)
8890
self._gapic_fn = functools.partial(
8991
gapic_client.mutate_rows,
9092
table_name=table.table_name,

google/cloud/bigtable/data/_async/_read_rows.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,7 @@ def __init__(
102102
self.table = table
103103
self._predicate = retries.if_exception_type(*retryable_exceptions)
104104
self._metadata = _make_metadata(
105-
table.table_name,
106-
table.app_profile_id,
105+
table.table_name, table.app_profile_id, instance_name=None
107106
)
108107
self._last_yielded_row_key: bytes | None = None
109108
self._remaining_count: int | None = self.request.rows_limit or None

google/cloud/bigtable/data/_async/client.py

Lines changed: 177 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -15,74 +15,88 @@
1515

1616
from __future__ import annotations
1717

18+
import asyncio
19+
from functools import partial
20+
import os
21+
import random
22+
import sys
23+
import time
1824
from typing import (
19-
cast,
25+
TYPE_CHECKING,
2026
Any,
2127
AsyncIterable,
28+
Dict,
2229
Optional,
23-
Set,
2430
Sequence,
25-
TYPE_CHECKING,
31+
Set,
32+
Union,
33+
cast,
2634
)
27-
28-
import asyncio
29-
import grpc
30-
import time
3135
import warnings
32-
import sys
33-
import random
34-
import os
3536

36-
from functools import partial
37+
from google.api_core import client_options as client_options_lib
38+
from google.api_core import retry as retries
39+
from google.api_core.exceptions import Aborted, DeadlineExceeded, ServiceUnavailable
40+
import google.auth._default
41+
import google.auth.credentials
42+
from google.cloud.client import ClientWithProject
43+
from google.cloud.environment_vars import BIGTABLE_EMULATOR # type: ignore
44+
import grpc
3745

46+
from google.cloud.bigtable.client import _DEFAULT_BIGTABLE_EMULATOR_CLIENT
47+
from google.cloud.bigtable.data.execute_query._async.execute_query_iterator import (
48+
ExecuteQueryIteratorAsync,
49+
)
50+
from google.cloud.bigtable.data._async._mutate_rows import _MutateRowsOperationAsync
51+
from google.cloud.bigtable.data._async._read_rows import _ReadRowsOperationAsync
52+
from google.cloud.bigtable.data._async.mutations_batcher import (
53+
_MB_SIZE,
54+
MutationsBatcherAsync,
55+
)
56+
from google.cloud.bigtable.data._helpers import (
57+
_CONCURRENCY_LIMIT,
58+
TABLE_DEFAULT,
59+
_attempt_timeout_generator,
60+
_get_error_type,
61+
_get_retryable_errors,
62+
_get_timeouts,
63+
_make_metadata,
64+
_retry_exception_factory,
65+
_validate_timeouts,
66+
_WarmedInstanceKey,
67+
)
68+
from google.cloud.bigtable.data.exceptions import (
69+
FailedQueryShardError,
70+
ShardedReadRowsExceptionGroup,
71+
)
72+
from google.cloud.bigtable.data.mutations import Mutation, RowMutationEntry
73+
from google.cloud.bigtable.data.read_modify_write_rules import ReadModifyWriteRule
74+
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
75+
from google.cloud.bigtable.data.row import Row
76+
from google.cloud.bigtable.data.row_filters import (
77+
CellsRowLimitFilter,
78+
RowFilter,
79+
RowFilterChain,
80+
StripValueTransformerFilter,
81+
)
82+
from google.cloud.bigtable.data.execute_query.values import ExecuteQueryValueType
83+
from google.cloud.bigtable.data.execute_query.metadata import SqlType
84+
from google.cloud.bigtable.data.execute_query._parameters_formatting import (
85+
_format_execute_query_params,
86+
)
87+
from google.cloud.bigtable_v2.services.bigtable.async_client import (
88+
DEFAULT_CLIENT_INFO,
89+
BigtableAsyncClient,
90+
)
3891
from google.cloud.bigtable_v2.services.bigtable.client import BigtableClientMeta
39-
from google.cloud.bigtable_v2.services.bigtable.async_client import BigtableAsyncClient
40-
from google.cloud.bigtable_v2.services.bigtable.async_client import DEFAULT_CLIENT_INFO
4192
from google.cloud.bigtable_v2.services.bigtable.transports.pooled_grpc_asyncio import (
4293
PooledBigtableGrpcAsyncIOTransport,
4394
PooledChannel,
4495
)
4596
from google.cloud.bigtable_v2.types.bigtable import PingAndWarmRequest
46-
from google.cloud.client import ClientWithProject
47-
from google.cloud.environment_vars import BIGTABLE_EMULATOR # type: ignore
48-
from google.api_core import retry as retries
49-
from google.api_core.exceptions import DeadlineExceeded
50-
from google.api_core.exceptions import ServiceUnavailable
51-
from google.api_core.exceptions import Aborted
52-
from google.cloud.bigtable.data._async._read_rows import _ReadRowsOperationAsync
53-
54-
import google.auth.credentials
55-
import google.auth._default
56-
from google.api_core import client_options as client_options_lib
57-
from google.cloud.bigtable.client import _DEFAULT_BIGTABLE_EMULATOR_CLIENT
58-
from google.cloud.bigtable.data.row import Row
59-
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
60-
from google.cloud.bigtable.data.exceptions import FailedQueryShardError
61-
from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup
62-
63-
from google.cloud.bigtable.data.mutations import Mutation, RowMutationEntry
64-
from google.cloud.bigtable.data._async._mutate_rows import _MutateRowsOperationAsync
65-
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT
66-
from google.cloud.bigtable.data._helpers import _WarmedInstanceKey
67-
from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT
68-
from google.cloud.bigtable.data._helpers import _make_metadata
69-
from google.cloud.bigtable.data._helpers import _retry_exception_factory
70-
from google.cloud.bigtable.data._helpers import _validate_timeouts
71-
from google.cloud.bigtable.data._helpers import _get_retryable_errors
72-
from google.cloud.bigtable.data._helpers import _get_timeouts
73-
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
74-
from google.cloud.bigtable.data._async.mutations_batcher import MutationsBatcherAsync
75-
from google.cloud.bigtable.data._async.mutations_batcher import _MB_SIZE
76-
from google.cloud.bigtable.data.read_modify_write_rules import ReadModifyWriteRule
77-
from google.cloud.bigtable.data.row_filters import RowFilter
78-
from google.cloud.bigtable.data.row_filters import StripValueTransformerFilter
79-
from google.cloud.bigtable.data.row_filters import CellsRowLimitFilter
80-
from google.cloud.bigtable.data.row_filters import RowFilterChain
81-
8297

8398
if TYPE_CHECKING:
84-
from google.cloud.bigtable.data._helpers import RowKeySamples
85-
from google.cloud.bigtable.data._helpers import ShardedQuery
99+
from google.cloud.bigtable.data._helpers import RowKeySamples, ShardedQuery
86100

87101

88102
class BigtableDataClientAsync(ClientWithProject):
@@ -315,7 +329,9 @@ async def _manage_channel(
315329
next_refresh = random.uniform(refresh_interval_min, refresh_interval_max)
316330
next_sleep = next_refresh - (time.time() - start_timestamp)
317331

318-
async def _register_instance(self, instance_id: str, owner: TableAsync) -> None:
332+
async def _register_instance(
333+
self, instance_id: str, owner: Union[TableAsync, ExecuteQueryIteratorAsync]
334+
) -> None:
319335
"""
320336
Registers an instance with the client, and warms the channel pool
321337
for the instance
@@ -346,7 +362,7 @@ async def _register_instance(self, instance_id: str, owner: TableAsync) -> None:
346362
self._start_background_channel_refresh()
347363

348364
async def _remove_instance_registration(
349-
self, instance_id: str, owner: TableAsync
365+
self, instance_id: str, owner: Union[TableAsync, ExecuteQueryIteratorAsync]
350366
) -> bool:
351367
"""
352368
Removes an instance from the client's registered instances, to prevent
@@ -416,6 +432,102 @@ def get_table(self, instance_id: str, table_id: str, *args, **kwargs) -> TableAs
416432
"""
417433
return TableAsync(self, instance_id, table_id, *args, **kwargs)
418434

435+
async def execute_query(
436+
self,
437+
query: str,
438+
instance_id: str,
439+
*,
440+
parameters: Dict[str, ExecuteQueryValueType] | None = None,
441+
parameter_types: Dict[str, SqlType.Type] | None = None,
442+
app_profile_id: str | None = None,
443+
operation_timeout: float = 600,
444+
attempt_timeout: float | None = 20,
445+
retryable_errors: Sequence[type[Exception]] = (
446+
DeadlineExceeded,
447+
ServiceUnavailable,
448+
Aborted,
449+
),
450+
) -> "ExecuteQueryIteratorAsync":
451+
"""
452+
Executes an SQL query on an instance.
453+
Returns an iterator to asynchronously stream back columns from selected rows.
454+
455+
Failed requests within operation_timeout will be retried based on the
456+
retryable_errors list until operation_timeout is reached.
457+
458+
Args:
459+
- query: Query to be run on Bigtable instance. The query can use ``@param``
460+
placeholders to use parameter interpolation on the server. Values for all
461+
parameters should be provided in ``parameters``. Types of parameters are
462+
inferred but should be provided in ``parameter_types`` if the inference is
463+
not possible (i.e. when value can be None, an empty list or an empty dict).
464+
- instance_id: The Bigtable instance ID to perform the query on.
465+
instance_id is combined with the client's project to fully
466+
specify the instance.
467+
- parameters: Dictionary with values for all parameters used in the ``query``.
468+
- parameter_types: Dictionary with types of parameters used in the ``query``.
469+
Required to contain entries only for parameters whose type cannot be
470+
detected automatically (i.e. the value can be None, an empty list or
471+
an empty dict).
472+
- app_profile_id: The app profile to associate with requests.
473+
https://cloud.google.com/bigtable/docs/app-profiles
474+
- operation_timeout: the time budget for the entire operation, in seconds.
475+
Failed requests will be retried within the budget.
476+
Defaults to 600 seconds.
477+
- attempt_timeout: the time budget for an individual network request, in seconds.
478+
If it takes longer than this time to complete, the request will be cancelled with
479+
a DeadlineExceeded exception, and a retry will be attempted.
480+
Defaults to the 20 seconds.
481+
If None, defaults to operation_timeout.
482+
- retryable_errors: a list of errors that will be retried if encountered.
483+
Defaults to 4 (DeadlineExceeded), 14 (ServiceUnavailable), and 10 (Aborted)
484+
Returns:
485+
- an asynchronous iterator that yields rows returned by the query
486+
Raises:
487+
- DeadlineExceeded: raised after operation timeout
488+
will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions
489+
from any retries that failed
490+
- GoogleAPIError: raised if the request encounters an unrecoverable error
491+
"""
492+
warnings.warn(
493+
"ExecuteQuery is in preview and may change in the future.",
494+
category=RuntimeWarning,
495+
)
496+
497+
retryable_excs = [_get_error_type(e) for e in retryable_errors]
498+
499+
pb_params = _format_execute_query_params(parameters, parameter_types)
500+
501+
instance_name = self._gapic_client.instance_path(self.project, instance_id)
502+
503+
request_body = {
504+
"instance_name": instance_name,
505+
"app_profile_id": app_profile_id,
506+
"query": query,
507+
"params": pb_params,
508+
"proto_format": {},
509+
}
510+
511+
# app_profile_id should be set to an empty string for ExecuteQueryRequest only
512+
app_profile_id_for_metadata = app_profile_id or ""
513+
514+
req_metadata = _make_metadata(
515+
table_name=None,
516+
app_profile_id=app_profile_id_for_metadata,
517+
instance_name=instance_name,
518+
)
519+
520+
return ExecuteQueryIteratorAsync(
521+
self,
522+
instance_id,
523+
app_profile_id,
524+
request_body,
525+
attempt_timeout,
526+
operation_timeout,
527+
req_metadata,
528+
retryable_excs,
529+
)
530+
419531
async def __aenter__(self):
420532
self._start_background_channel_refresh()
421533
return self
@@ -893,7 +1005,9 @@ async def sample_row_keys(
8931005
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
8941006

8951007
# prepare request
896-
metadata = _make_metadata(self.table_name, self.app_profile_id)
1008+
metadata = _make_metadata(
1009+
self.table_name, self.app_profile_id, instance_name=None
1010+
)
8971011

8981012
async def execute_rpc():
8991013
results = await self.client._gapic_client.sample_row_keys(
@@ -1029,7 +1143,9 @@ async def mutate_row(
10291143
table_name=self.table_name,
10301144
app_profile_id=self.app_profile_id,
10311145
timeout=attempt_timeout,
1032-
metadata=_make_metadata(self.table_name, self.app_profile_id),
1146+
metadata=_make_metadata(
1147+
self.table_name, self.app_profile_id, instance_name=None
1148+
),
10331149
retry=None,
10341150
)
10351151
return await retries.retry_target_async(
@@ -1147,7 +1263,9 @@ async def check_and_mutate_row(
11471263
):
11481264
false_case_mutations = [false_case_mutations]
11491265
false_case_list = [m._to_pb() for m in false_case_mutations or []]
1150-
metadata = _make_metadata(self.table_name, self.app_profile_id)
1266+
metadata = _make_metadata(
1267+
self.table_name, self.app_profile_id, instance_name=None
1268+
)
11511269
result = await self.client._gapic_client.check_and_mutate_row(
11521270
true_mutations=true_case_list,
11531271
false_mutations=false_case_list,
@@ -1198,7 +1316,9 @@ async def read_modify_write_row(
11981316
rules = [rules]
11991317
if not rules:
12001318
raise ValueError("rules must contain at least one item")
1201-
metadata = _make_metadata(self.table_name, self.app_profile_id)
1319+
metadata = _make_metadata(
1320+
self.table_name, self.app_profile_id, instance_name=None
1321+
)
12021322
result = await self.client._gapic_client.read_modify_write_row(
12031323
rules=[rule._to_pb() for rule in rules],
12041324
row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key,

0 commit comments

Comments
 (0)