From 4616cd58d3c6da641fb881ce99a87dcdedc20ba2 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Mon, 8 Nov 2021 17:21:24 +0100 Subject: [PATCH 01/10] deps: support OpenTelemetry >= 1.1.0 (#1050) --- .../cloud/bigquery/opentelemetry_tracing.py | 11 ++++--- setup.py | 6 ++-- testing/constraints-3.6.txt | 6 ++-- tests/unit/test_client.py | 31 +++++++++++++------ tests/unit/test_opentelemetry_tracing.py | 21 ++++++++----- 5 files changed, 47 insertions(+), 28 deletions(-) diff --git a/google/cloud/bigquery/opentelemetry_tracing.py b/google/cloud/bigquery/opentelemetry_tracing.py index 57f258ac4..b1a1027d2 100644 --- a/google/cloud/bigquery/opentelemetry_tracing.py +++ b/google/cloud/bigquery/opentelemetry_tracing.py @@ -19,7 +19,7 @@ logger = logging.getLogger(__name__) try: from opentelemetry import trace - from opentelemetry.instrumentation.utils import http_status_to_canonical_code + from opentelemetry.instrumentation.utils import http_status_to_status_code from opentelemetry.trace.status import Status HAS_OPENTELEMETRY = True @@ -65,9 +65,10 @@ def create_span(name, attributes=None, client=None, job_ref=None): if not _warned_telemetry: logger.debug( "This service is instrumented using OpenTelemetry. " - "OpenTelemetry could not be imported; please " - "add opentelemetry-api and opentelemetry-instrumentation " - "packages in order to get BigQuery Tracing data." + "OpenTelemetry or one of its components could not be imported; " + "please add compatible versions of opentelemetry-api and " + "opentelemetry-instrumentation packages in order to get BigQuery " + "Tracing data." ) _warned_telemetry = True @@ -81,7 +82,7 @@ def create_span(name, attributes=None, client=None, job_ref=None): yield span except GoogleAPICallError as error: if error.code is not None: - span.set_status(Status(http_status_to_canonical_code(error.code))) + span.set_status(Status(http_status_to_status_code(error.code))) raise diff --git a/setup.py b/setup.py index db69c45b1..5c0b80f7c 100644 --- a/setup.py +++ b/setup.py @@ -63,9 +63,9 @@ "bignumeric_type": pyarrow_dep, "tqdm": ["tqdm >= 4.7.4, <5.0.0dev"], "opentelemetry": [ - "opentelemetry-api >= 0.11b0", - "opentelemetry-sdk >= 0.11b0", - "opentelemetry-instrumentation >= 0.11b0", + "opentelemetry-api >= 1.1.0", + "opentelemetry-sdk >= 1.1.0", + "opentelemetry-instrumentation >= 0.20b0", ], } diff --git a/testing/constraints-3.6.txt b/testing/constraints-3.6.txt index 59913d588..f967077bc 100644 --- a/testing/constraints-3.6.txt +++ b/testing/constraints-3.6.txt @@ -11,9 +11,9 @@ google-cloud-bigquery-storage==2.0.0 google-cloud-core==1.4.1 google-resumable-media==0.6.0 grpcio==1.38.1 -opentelemetry-api==0.11b0 -opentelemetry-instrumentation==0.11b0 -opentelemetry-sdk==0.11b0 +opentelemetry-api==1.1.0 +opentelemetry-instrumentation==0.20b0 +opentelemetry-sdk==1.1.0 pandas==0.24.2 proto-plus==1.10.0 protobuf==3.12.0 diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 11b336728..97aa2eedb 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -36,16 +36,24 @@ import pandas except (ImportError, AttributeError): # pragma: NO COVER pandas = None + try: import opentelemetry - from opentelemetry import trace - from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.sdk.trace.export import SimpleExportSpanProcessor - from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( - InMemorySpanExporter, - ) -except (ImportError, AttributeError): # pragma: NO COVER +except ImportError: opentelemetry = None + +if opentelemetry is not None: + try: + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + except (ImportError, AttributeError) as exc: # pragma: NO COVER + msg = "Error importing from opentelemetry, is the installed version compatible?" + raise ImportError(msg) from exc + try: import pyarrow except (ImportError, AttributeError): # pragma: NO COVER @@ -784,9 +792,12 @@ def test_span_status_is_set(self): tracer_provider = TracerProvider() memory_exporter = InMemorySpanExporter() - span_processor = SimpleExportSpanProcessor(memory_exporter) + span_processor = SimpleSpanProcessor(memory_exporter) tracer_provider.add_span_processor(span_processor) - trace.set_tracer_provider(tracer_provider) + + # OpenTelemetry API >= 0.12b0 does not allow overriding the tracer once + # initialized, thus directly override the internal global var. + tracer_patcher = mock.patch.object(trace, "_TRACER_PROVIDER", tracer_provider) creds = _make_credentials() client = self._make_one(project=self.PROJECT, credentials=creds) @@ -797,7 +808,7 @@ def test_span_status_is_set(self): full_routine_id = "test-routine-project.test_routines.minimal_routine" routine = Routine(full_routine_id) - with pytest.raises(google.api_core.exceptions.AlreadyExists): + with pytest.raises(google.api_core.exceptions.AlreadyExists), tracer_patcher: client.create_routine(routine) span_list = memory_exporter.get_finished_spans() diff --git a/tests/unit/test_opentelemetry_tracing.py b/tests/unit/test_opentelemetry_tracing.py index 726e3cf6f..cc1ca7903 100644 --- a/tests/unit/test_opentelemetry_tracing.py +++ b/tests/unit/test_opentelemetry_tracing.py @@ -20,14 +20,21 @@ try: import opentelemetry - from opentelemetry import trace - from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.sdk.trace.export import SimpleExportSpanProcessor - from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( - InMemorySpanExporter, - ) except ImportError: # pragma: NO COVER opentelemetry = None + +if opentelemetry is not None: + try: + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + except (ImportError, AttributeError) as exc: # pragma: NO COVER + msg = "Error importing from opentelemetry, is the installed version compatible?" + raise ImportError(msg) from exc + import pytest from google.cloud.bigquery import opentelemetry_tracing @@ -42,7 +49,7 @@ def setup(): importlib.reload(opentelemetry_tracing) tracer_provider = TracerProvider() memory_exporter = InMemorySpanExporter() - span_processor = SimpleExportSpanProcessor(memory_exporter) + span_processor = SimpleSpanProcessor(memory_exporter) tracer_provider.add_span_processor(span_processor) trace.set_tracer_provider(tracer_provider) yield memory_exporter From 66b3dd9f9aec3fda9610a3ceec8d8a477f2ab3b9 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Mon, 8 Nov 2021 22:47:37 +0100 Subject: [PATCH 02/10] process: make mypy happy with type annotations (#1036) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * process: add mypy types check to nox sessions * Ignore type errors for not annotated modules Several dependencies lack type annotations, or they don't advertise themselves as type-annotated. We do not want `mypy` to complain about these. * Fix mypy complaints (batch 1) * Fix mypy complaints (batch 2) * Fix mypy complaints (batch 3) * Fix mypy false positive errors * Simplify external config options instantiation * Do not ignore api-core in type checks More recent releases of google-api-core have typing enabled. * Remove unneeded __hash__ = None lines * Use an alias for timeout type in client.py * Fix PathLike subscription error in pre-Python 3.9 * Fix a typo in docstring Co-authored-by: Tim Swast * Add mypy to the list of nox sessions to run * Fix opentelemetry type error The Opentelemetry APi has changed from the minimum version the BigQuery client currently uses, we thus need to bound the maximum Opentelemetry version. In addition, that maximum version does not yet support type checks, thus it is ignored. * 🦉 Updates from OwlBot See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Exclude type-checking code from coverage * Fix patching opentelemetry tracer pvoider * Adjust get_job() return type, ignore opentelemetry Co-authored-by: Tim Swast Co-authored-by: Owl Bot --- google/cloud/__init__.py | 2 +- google/cloud/bigquery/_helpers.py | 4 +- google/cloud/bigquery/_http.py | 2 +- google/cloud/bigquery/_pandas_helpers.py | 8 +- google/cloud/bigquery/_tqdm_helpers.py | 2 +- google/cloud/bigquery/client.py | 211 +++++++++--------- google/cloud/bigquery/dataset.py | 2 +- google/cloud/bigquery/dbapi/_helpers.py | 16 +- google/cloud/bigquery/dbapi/cursor.py | 2 +- google/cloud/bigquery/external_config.py | 30 ++- google/cloud/bigquery/job/base.py | 4 +- google/cloud/bigquery/job/query.py | 31 ++- .../bigquery/magics/line_arg_parser/lexer.py | 2 +- google/cloud/bigquery/magics/magics.py | 8 +- google/cloud/bigquery/model.py | 4 +- .../cloud/bigquery/opentelemetry_tracing.py | 2 +- google/cloud/bigquery/query.py | 8 +- google/cloud/bigquery/retry.py | 2 +- google/cloud/bigquery/routine/routine.py | 2 +- google/cloud/bigquery/schema.py | 4 +- google/cloud/bigquery/table.py | 28 +-- noxfile.py | 19 +- tests/unit/test_opentelemetry_tracing.py | 9 +- 23 files changed, 220 insertions(+), 182 deletions(-) diff --git a/google/cloud/__init__.py b/google/cloud/__init__.py index 8fcc60e2b..8e60d8439 100644 --- a/google/cloud/__init__.py +++ b/google/cloud/__init__.py @@ -21,4 +21,4 @@ except ImportError: import pkgutil - __path__ = pkgutil.extend_path(__path__, __name__) + __path__ = pkgutil.extend_path(__path__, __name__) # type: ignore diff --git a/google/cloud/bigquery/_helpers.py b/google/cloud/bigquery/_helpers.py index e95d38545..e2ca7fa07 100644 --- a/google/cloud/bigquery/_helpers.py +++ b/google/cloud/bigquery/_helpers.py @@ -22,7 +22,7 @@ from typing import Any, Optional, Union from dateutil import relativedelta -from google.cloud._helpers import UTC +from google.cloud._helpers import UTC # type: ignore from google.cloud._helpers import _date_from_iso8601_date from google.cloud._helpers import _datetime_from_microseconds from google.cloud._helpers import _RFC3339_MICROS @@ -126,7 +126,7 @@ def __init__(self): def installed_version(self) -> packaging.version.Version: """Return the parsed version of pyarrow.""" if self._installed_version is None: - import pyarrow + import pyarrow # type: ignore self._installed_version = packaging.version.parse( # Use 0.0.0, since it is earlier than any released version. diff --git a/google/cloud/bigquery/_http.py b/google/cloud/bigquery/_http.py index 81e7922e6..f7207f32e 100644 --- a/google/cloud/bigquery/_http.py +++ b/google/cloud/bigquery/_http.py @@ -17,7 +17,7 @@ import os import pkg_resources -from google.cloud import _http # pytype: disable=import-error +from google.cloud import _http # type: ignore # pytype: disable=import-error from google.cloud.bigquery import __version__ diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 0cb851469..de6356c2a 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -21,7 +21,7 @@ import warnings try: - import pandas + import pandas # type: ignore except ImportError: # pragma: NO COVER pandas = None else: @@ -29,7 +29,7 @@ try: # _BaseGeometry is used to detect shapely objevys in `bq_to_arrow_array` - from shapely.geometry.base import BaseGeometry as _BaseGeometry + from shapely.geometry.base import BaseGeometry as _BaseGeometry # type: ignore except ImportError: # pragma: NO COVER # No shapely, use NoneType for _BaseGeometry as a placeholder. _BaseGeometry = type(None) @@ -43,7 +43,7 @@ def _to_wkb(): # - Avoid extra work done by `shapely.wkb.dumps` that we don't need. # - Caches the WKBWriter (and write method lookup :) ) # - Avoids adding WKBWriter, lgeos, and notnull to the module namespace. - from shapely.geos import WKBWriter, lgeos + from shapely.geos import WKBWriter, lgeos # type: ignore write = WKBWriter(lgeos).write notnull = pandas.notnull @@ -574,7 +574,7 @@ def dataframe_to_parquet( """ pyarrow = _helpers.PYARROW_VERSIONS.try_import(raise_if_error=True) - import pyarrow.parquet + import pyarrow.parquet # type: ignore kwargs = ( {"use_compliant_nested_type": parquet_use_compliant_nested_type} diff --git a/google/cloud/bigquery/_tqdm_helpers.py b/google/cloud/bigquery/_tqdm_helpers.py index 99e720e2b..632f70f87 100644 --- a/google/cloud/bigquery/_tqdm_helpers.py +++ b/google/cloud/bigquery/_tqdm_helpers.py @@ -21,7 +21,7 @@ import warnings try: - import tqdm + import tqdm # type: ignore except ImportError: # pragma: NO COVER tqdm = None diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 4bdd43e8f..3e641e195 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -28,12 +28,23 @@ import math import os import tempfile -from typing import Any, BinaryIO, Dict, Iterable, Optional, Sequence, Tuple, Union +import typing +from typing import ( + Any, + BinaryIO, + Dict, + Iterable, + List, + Optional, + Sequence, + Tuple, + Union, +) import uuid import warnings from google import resumable_media # type: ignore -from google.resumable_media.requests import MultipartUpload +from google.resumable_media.requests import MultipartUpload # type: ignore from google.resumable_media.requests import ResumableUpload import google.api_core.client_options @@ -41,16 +52,16 @@ from google.api_core.iam import Policy from google.api_core import page_iterator from google.api_core import retry as retries -import google.cloud._helpers +import google.cloud._helpers # type: ignore from google.cloud import exceptions # pytype: disable=import-error -from google.cloud.client import ClientWithProject # pytype: disable=import-error +from google.cloud.client import ClientWithProject # type: ignore # pytype: disable=import-error try: from google.cloud.bigquery_storage_v1.services.big_query_read.client import ( DEFAULT_CLIENT_INFO as DEFAULT_BQSTORAGE_CLIENT_INFO, ) except ImportError: - DEFAULT_BQSTORAGE_CLIENT_INFO = None + DEFAULT_BQSTORAGE_CLIENT_INFO = None # type: ignore from google.cloud.bigquery._helpers import _del_sub_prop from google.cloud.bigquery._helpers import _get_sub_prop @@ -100,6 +111,11 @@ pyarrow = _helpers.PYARROW_VERSIONS.try_import() +TimeoutType = Union[float, None] + +if typing.TYPE_CHECKING: # pragma: NO COVER + # os.PathLike is only subscriptable in Python 3.9+, thus shielding with a condition. + PathType = Union[str, bytes, os.PathLike[str], os.PathLike[bytes]] _DEFAULT_CHUNKSIZE = 100 * 1024 * 1024 # 100 MB _MAX_MULTIPART_SIZE = 5 * 1024 * 1024 @@ -248,7 +264,7 @@ def get_service_account_email( self, project: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> str: """Get the email address of the project's BigQuery service account @@ -295,7 +311,7 @@ def list_projects( max_results: int = None, page_token: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, page_size: int = None, ) -> page_iterator.Iterator: """List projects for the project associated with this client. @@ -361,7 +377,7 @@ def list_datasets( max_results: int = None, page_token: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, page_size: int = None, ) -> page_iterator.Iterator: """List datasets for the project associated with this client. @@ -400,7 +416,7 @@ def list_datasets( Iterator of :class:`~google.cloud.bigquery.dataset.DatasetListItem`. associated with the project. """ - extra_params = {} + extra_params: Dict[str, Any] = {} if project is None: project = self.project if include_all: @@ -526,12 +542,12 @@ def _ensure_bqstorage_client( bqstorage_client = bigquery_storage.BigQueryReadClient( credentials=self._credentials, client_options=client_options, - client_info=client_info, + client_info=client_info, # type: ignore # (None is also accepted) ) return bqstorage_client - def _dataset_from_arg(self, dataset): + def _dataset_from_arg(self, dataset) -> Union[Dataset, DatasetReference]: if isinstance(dataset, str): dataset = DatasetReference.from_string( dataset, default_project=self.project @@ -552,7 +568,7 @@ def create_dataset( dataset: Union[str, Dataset, DatasetReference, DatasetListItem], exists_ok: bool = False, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Dataset: """API call: create the dataset via a POST request. @@ -627,7 +643,7 @@ def create_routine( routine: Routine, exists_ok: bool = False, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Routine: """[Beta] Create a routine via a POST request. @@ -682,7 +698,7 @@ def create_table( table: Union[str, Table, TableReference, TableListItem], exists_ok: bool = False, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Table: """API call: create a table via a PUT request @@ -765,7 +781,7 @@ def get_dataset( self, dataset_ref: Union[DatasetReference, str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Dataset: """Fetch the dataset referenced by ``dataset_ref`` @@ -809,7 +825,7 @@ def get_iam_policy( table: Union[Table, TableReference, TableListItem, str], requested_policy_version: int = 1, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Policy: table = _table_arg_to_table_ref(table, default_project=self.project) @@ -838,7 +854,7 @@ def set_iam_policy( policy: Policy, updateMask: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Policy: table = _table_arg_to_table_ref(table, default_project=self.project) @@ -870,7 +886,7 @@ def test_iam_permissions( table: Union[Table, TableReference, TableListItem, str], permissions: Sequence[str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Dict[str, Any]: table = _table_arg_to_table_ref(table, default_project=self.project) @@ -894,7 +910,7 @@ def get_model( self, model_ref: Union[ModelReference, str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Model: """[Beta] Fetch the model referenced by ``model_ref``. @@ -937,7 +953,7 @@ def get_routine( self, routine_ref: Union[Routine, RoutineReference, str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Routine: """[Beta] Get the routine referenced by ``routine_ref``. @@ -981,7 +997,7 @@ def get_table( self, table: Union[Table, TableReference, TableListItem, str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Table: """Fetch the table referenced by ``table``. @@ -1024,7 +1040,7 @@ def update_dataset( dataset: Dataset, fields: Sequence[str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Dataset: """Change some fields of a dataset. @@ -1071,7 +1087,7 @@ def update_dataset( """ partial = dataset._build_resource(fields) if dataset.etag is not None: - headers = {"If-Match": dataset.etag} + headers: Optional[Dict[str, str]] = {"If-Match": dataset.etag} else: headers = None path = dataset.path @@ -1094,7 +1110,7 @@ def update_model( model: Model, fields: Sequence[str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Model: """[Beta] Change some fields of a model. @@ -1135,7 +1151,7 @@ def update_model( """ partial = model._build_resource(fields) if model.etag: - headers = {"If-Match": model.etag} + headers: Optional[Dict[str, str]] = {"If-Match": model.etag} else: headers = None path = model.path @@ -1158,7 +1174,7 @@ def update_routine( routine: Routine, fields: Sequence[str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Routine: """[Beta] Change some fields of a routine. @@ -1205,7 +1221,7 @@ def update_routine( """ partial = routine._build_resource(fields) if routine.etag: - headers = {"If-Match": routine.etag} + headers: Optional[Dict[str, str]] = {"If-Match": routine.etag} else: headers = None @@ -1232,7 +1248,7 @@ def update_table( table: Table, fields: Sequence[str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Table: """Change some fields of a table. @@ -1273,7 +1289,7 @@ def update_table( """ partial = table._build_resource(fields) if table.etag is not None: - headers = {"If-Match": table.etag} + headers: Optional[Dict[str, str]] = {"If-Match": table.etag} else: headers = None @@ -1298,7 +1314,7 @@ def list_models( max_results: int = None, page_token: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, page_size: int = None, ) -> page_iterator.Iterator: """[Beta] List models in the dataset. @@ -1366,7 +1382,7 @@ def api_request(*args, **kwargs): max_results=max_results, page_size=page_size, ) - result.dataset = dataset + result.dataset = dataset # type: ignore return result def list_routines( @@ -1375,7 +1391,7 @@ def list_routines( max_results: int = None, page_token: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, page_size: int = None, ) -> page_iterator.Iterator: """[Beta] List routines in the dataset. @@ -1443,7 +1459,7 @@ def api_request(*args, **kwargs): max_results=max_results, page_size=page_size, ) - result.dataset = dataset + result.dataset = dataset # type: ignore return result def list_tables( @@ -1452,7 +1468,7 @@ def list_tables( max_results: int = None, page_token: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, page_size: int = None, ) -> page_iterator.Iterator: """List tables in the dataset. @@ -1519,7 +1535,7 @@ def api_request(*args, **kwargs): max_results=max_results, page_size=page_size, ) - result.dataset = dataset + result.dataset = dataset # type: ignore return result def delete_dataset( @@ -1527,7 +1543,7 @@ def delete_dataset( dataset: Union[Dataset, DatasetReference, DatasetListItem, str], delete_contents: bool = False, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, not_found_ok: bool = False, ) -> None: """Delete a dataset. @@ -1586,7 +1602,7 @@ def delete_model( self, model: Union[Model, ModelReference, str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, not_found_ok: bool = False, ) -> None: """[Beta] Delete a model @@ -1640,7 +1656,7 @@ def delete_job_metadata( project: Optional[str] = None, location: Optional[str] = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, not_found_ok: bool = False, ): """[Beta] Delete job metadata from job history. @@ -1703,7 +1719,7 @@ def delete_routine( self, routine: Union[Routine, RoutineReference, str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, not_found_ok: bool = False, ) -> None: """[Beta] Delete a routine. @@ -1757,7 +1773,7 @@ def delete_table( self, table: Union[Table, TableReference, TableListItem, str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, not_found_ok: bool = False, ) -> None: """Delete a table @@ -1811,7 +1827,7 @@ def _get_query_results( project: str = None, timeout_ms: int = None, location: str = None, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> _QueryResults: """Get the query results object for a query job. @@ -1836,7 +1852,7 @@ def _get_query_results( A new ``_QueryResults`` instance. """ - extra_params = {"maxResults": 0} + extra_params: Dict[str, Any] = {"maxResults": 0} if timeout is not None: timeout = max(timeout, _MIN_GET_QUERY_RESULTS_TIMEOUT) @@ -1870,20 +1886,18 @@ def _get_query_results( ) return _QueryResults.from_api_repr(resource) - def job_from_resource(self, resource: dict) -> job.UnknownJob: + def job_from_resource( + self, resource: dict + ) -> Union[ + job.CopyJob, job.ExtractJob, job.LoadJob, job.QueryJob, job.UnknownJob, + ]: """Detect correct job type from resource and instantiate. Args: resource (Dict): one job resource from API response Returns: - Union[ \ - google.cloud.bigquery.job.LoadJob, \ - google.cloud.bigquery.job.CopyJob, \ - google.cloud.bigquery.job.ExtractJob, \ - google.cloud.bigquery.job.QueryJob \ - ]: - The job instance, constructed via the resource. + The job instance, constructed via the resource. """ config = resource.get("configuration", {}) if "load" in config: @@ -1900,7 +1914,7 @@ def create_job( self, job_config: dict, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob]: """Create a new job. Args: @@ -1933,7 +1947,7 @@ def create_job( return self.load_table_from_uri( source_uris, destination, - job_config=load_job_config, + job_config=typing.cast(LoadJobConfig, load_job_config), retry=retry, timeout=timeout, ) @@ -1953,7 +1967,7 @@ def create_job( return self.copy_table( sources, destination, - job_config=copy_job_config, + job_config=typing.cast(CopyJobConfig, copy_job_config), retry=retry, timeout=timeout, ) @@ -1973,7 +1987,7 @@ def create_job( return self.extract_table( source, destination_uris, - job_config=extract_job_config, + job_config=typing.cast(ExtractJobConfig, extract_job_config), retry=retry, timeout=timeout, source_type=source_type, @@ -1986,32 +2000,30 @@ def create_job( ) query = _get_sub_prop(copy_config, ["query", "query"]) return self.query( - query, job_config=query_job_config, retry=retry, timeout=timeout + query, + job_config=typing.cast(QueryJobConfig, query_job_config), + retry=retry, + timeout=timeout, ) else: raise TypeError("Invalid job configuration received.") def get_job( self, - job_id: str, + job_id: Union[str, job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob], project: str = None, location: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, - ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob]: + timeout: TimeoutType = DEFAULT_TIMEOUT, + ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob, job.UnknownJob]: """Fetch a job for the project associated with this client. See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get Args: - job_id (Union[ \ - str, \ - google.cloud.bigquery.job.LoadJob, \ - google.cloud.bigquery.job.CopyJob, \ - google.cloud.bigquery.job.ExtractJob, \ - google.cloud.bigquery.job.QueryJob \ - ]): Job identifier. + job_id: + Job identifier. Keyword Arguments: project (Optional[str]): @@ -2026,13 +2038,7 @@ def get_job( before using ``retry``. Returns: - Union[ \ - google.cloud.bigquery.job.LoadJob, \ - google.cloud.bigquery.job.CopyJob, \ - google.cloud.bigquery.job.ExtractJob, \ - google.cloud.bigquery.job.QueryJob \ - ]: - Job instance, based on the resource returned by the API. + Job instance, based on the resource returned by the API. """ extra_params = {"projection": "full"} @@ -2071,7 +2077,7 @@ def cancel_job( project: str = None, location: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob]: """Attempt to cancel a job from a job ID. @@ -2137,7 +2143,11 @@ def cancel_job( timeout=timeout, ) - return self.job_from_resource(resource["job"]) + job_instance = self.job_from_resource(resource["job"]) # never an UnknownJob + + return typing.cast( + Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob], job_instance, + ) def list_jobs( self, @@ -2148,7 +2158,7 @@ def list_jobs( all_users: bool = None, state_filter: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, min_creation_time: datetime.datetime = None, max_creation_time: datetime.datetime = None, page_size: int = None, @@ -2263,9 +2273,9 @@ def load_table_from_uri( project: str = None, job_config: LoadJobConfig = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> job.LoadJob: - """Starts a job for loading data into a table from CloudStorage. + """Starts a job for loading data into a table from Cloud Storage. See https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload @@ -2348,7 +2358,7 @@ def load_table_from_file( location: str = None, project: str = None, job_config: LoadJobConfig = None, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> job.LoadJob: """Upload the contents of this table from a file-like object. @@ -2439,7 +2449,7 @@ def load_table_from_file( except resumable_media.InvalidResponse as exc: raise exceptions.from_http_response(exc.response) - return self.job_from_resource(response.json()) + return typing.cast(LoadJob, self.job_from_resource(response.json())) def load_table_from_dataframe( self, @@ -2452,7 +2462,7 @@ def load_table_from_dataframe( project: str = None, job_config: LoadJobConfig = None, parquet_compression: str = "snappy", - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> job.LoadJob: """Upload the contents of a table from a pandas DataFrame. @@ -2592,7 +2602,7 @@ def load_table_from_dataframe( try: table = self.get_table(destination) except core_exceptions.NotFound: - table = None + pass else: columns_and_indexes = frozenset( name @@ -2707,7 +2717,7 @@ def load_table_from_json( location: str = None, project: str = None, job_config: LoadJobConfig = None, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> job.LoadJob: """Upload the contents of a table from a JSON string or dict. @@ -2995,7 +3005,7 @@ def copy_table( project: str = None, job_config: CopyJobConfig = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> job.CopyJob: """Copy one or more tables to another table. @@ -3101,7 +3111,7 @@ def extract_table( project: str = None, job_config: ExtractJobConfig = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, source_type: str = "Table", ) -> job.ExtractJob: """Start a job to extract a table into Cloud Storage files. @@ -3200,7 +3210,7 @@ def query( location: str = None, project: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, job_retry: retries.Retry = DEFAULT_JOB_RETRY, ) -> job.QueryJob: """Run a SQL query. @@ -3357,7 +3367,7 @@ def insert_rows( table: Union[Table, TableReference, str], rows: Union[Iterable[Tuple], Iterable[Dict]], selected_fields: Sequence[SchemaField] = None, - **kwargs: dict, + **kwargs, ) -> Sequence[dict]: """Insert rows into a table via the streaming API. @@ -3482,7 +3492,7 @@ def insert_rows_json( ignore_unknown_values: bool = None, template_suffix: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Sequence[dict]: """Insert rows into a table without applying local type conversions. @@ -3550,8 +3560,8 @@ def insert_rows_json( # insert_rows_json doesn't need the table schema. It's not doing any # type conversions. table = _table_arg_to_table_ref(table, default_project=self.project) - rows_info = [] - data = {"rows": rows_info} + rows_info: List[Any] = [] + data: Dict[str, Any] = {"rows": rows_info} if row_ids is None: warnings.warn( @@ -3569,7 +3579,7 @@ def insert_rows_json( raise TypeError(msg) for i, row in enumerate(json_rows): - info = {"json": row} + info: Dict[str, Any] = {"json": row} if row_ids is AutoRowIDs.GENERATE_UUID: info["insertId"] = str(uuid.uuid4()) @@ -3618,7 +3628,7 @@ def list_partitions( self, table: Union[Table, TableReference, TableListItem, str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Sequence[str]: """List the partitions in a table. @@ -3669,7 +3679,7 @@ def list_rows( start_index: int = None, page_size: int = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> RowIterator: """List the rows of the table. @@ -3745,7 +3755,7 @@ def list_rows( table = self.get_table(table.reference, retry=retry, timeout=timeout) schema = table.schema - params = {} + params: Dict[str, Any] = {} if selected_fields is not None: params["selectedFields"] = ",".join(field.name for field in selected_fields) if start_index is not None: @@ -3781,7 +3791,7 @@ def _list_rows_from_query_results( start_index: int = None, page_size: int = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> RowIterator: """List the rows of a completed query. See @@ -3826,7 +3836,7 @@ def _list_rows_from_query_results( Iterator of row data :class:`~google.cloud.bigquery.table.Row`-s. """ - params = { + params: Dict[str, Any] = { "fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS, "location": location, } @@ -3867,7 +3877,7 @@ def _schema_to_json_file_object(self, schema_list, file_obj): """ json.dump(schema_list, file_obj, indent=2, sort_keys=True) - def schema_from_json(self, file_or_path: Union[str, BinaryIO]): + def schema_from_json(self, file_or_path: "PathType"): """Takes a file object or file path that contains json that describes a table schema. @@ -3881,7 +3891,7 @@ def schema_from_json(self, file_or_path: Union[str, BinaryIO]): return self._schema_from_json_file_object(file_obj) def schema_to_json( - self, schema_list: Sequence[SchemaField], destination: Union[str, BinaryIO] + self, schema_list: Sequence[SchemaField], destination: "PathType" ): """Takes a list of schema field objects. @@ -4023,13 +4033,12 @@ def _extract_job_reference(job, project=None, location=None): return (project, location, job_id) -def _make_job_id(job_id, prefix=None): +def _make_job_id(job_id: Optional[str], prefix: Optional[str] = None) -> str: """Construct an ID for a new job. Args: - job_id (Optional[str]): the user-provided job ID. - - prefix (Optional[str]): the user-provided prefix for a job ID. + job_id: the user-provided job ID. + prefix: the user-provided prefix for a job ID. Returns: str: A job ID diff --git a/google/cloud/bigquery/dataset.py b/google/cloud/bigquery/dataset.py index 21e56f305..ff015d605 100644 --- a/google/cloud/bigquery/dataset.py +++ b/google/cloud/bigquery/dataset.py @@ -18,7 +18,7 @@ import copy -import google.cloud._helpers +import google.cloud._helpers # type: ignore from google.cloud.bigquery import _helpers from google.cloud.bigquery.model import ModelReference diff --git a/google/cloud/bigquery/dbapi/_helpers.py b/google/cloud/bigquery/dbapi/_helpers.py index 72e711bcf..e5c7ef7ec 100644 --- a/google/cloud/bigquery/dbapi/_helpers.py +++ b/google/cloud/bigquery/dbapi/_helpers.py @@ -161,7 +161,7 @@ def _parse_struct_fields( yield m.group(1, 2) -SCALAR, ARRAY, STRUCT = "sar" +SCALAR, ARRAY, STRUCT = ("s", "a", "r") def _parse_type( @@ -226,19 +226,19 @@ def complex_query_parameter_type(name: typing.Optional[str], type_: str, base: s type_type, sub_type = _parse_type(type_, name, base) if type_type == SCALAR: - type_ = sub_type + result_type = sub_type elif type_type == ARRAY: - type_ = query.ArrayQueryParameterType(sub_type, name=name) + result_type = query.ArrayQueryParameterType(sub_type, name=name) elif type_type == STRUCT: fields = [ complex_query_parameter_type(field_name, field_type, base) for field_name, field_type in sub_type ] - type_ = query.StructQueryParameterType(*fields, name=name) + result_type = query.StructQueryParameterType(*fields, name=name) else: # pragma: NO COVER raise AssertionError("Bad type_type", type_type) # Can't happen :) - return type_ + return result_type def complex_query_parameter( @@ -256,6 +256,12 @@ def complex_query_parameter( struct>> """ + param: typing.Union[ + query.ScalarQueryParameter, + query.ArrayQueryParameter, + query.StructQueryParameter, + ] + base = base or type_ type_type, sub_type = _parse_type(type_, name, base) diff --git a/google/cloud/bigquery/dbapi/cursor.py b/google/cloud/bigquery/dbapi/cursor.py index b1239ff57..03f3b72ca 100644 --- a/google/cloud/bigquery/dbapi/cursor.py +++ b/google/cloud/bigquery/dbapi/cursor.py @@ -31,7 +31,7 @@ from google.cloud.bigquery import job from google.cloud.bigquery.dbapi import _helpers from google.cloud.bigquery.dbapi import exceptions -import google.cloud.exceptions +import google.cloud.exceptions # type: ignore _LOGGER = logging.getLogger(__name__) diff --git a/google/cloud/bigquery/external_config.py b/google/cloud/bigquery/external_config.py index 5f284c639..e6f6a97c3 100644 --- a/google/cloud/bigquery/external_config.py +++ b/google/cloud/bigquery/external_config.py @@ -556,6 +556,10 @@ def from_api_repr(cls, resource: dict) -> "GoogleSheetsOptions": ParquetOptions, ) +OptionsType = Union[ + AvroOptions, BigtableOptions, CSVOptions, GoogleSheetsOptions, ParquetOptions, +] + class HivePartitioningOptions(object): """[Beta] Options that configure hive partitioning. @@ -664,13 +668,15 @@ def source_format(self): return self._properties["sourceFormat"] @property - def options(self) -> Optional[Union[_OPTION_CLASSES]]: + def options(self) -> Optional[OptionsType]: """Source-specific options.""" for optcls in _OPTION_CLASSES: - if self.source_format == optcls._SOURCE_FORMAT: - options = optcls() - self._properties.setdefault(optcls._RESOURCE_NAME, {}) - options._properties = self._properties[optcls._RESOURCE_NAME] + # The code below is too much magic for mypy to handle. + if self.source_format == optcls._SOURCE_FORMAT: # type: ignore + options: OptionsType = optcls() # type: ignore + options._properties = self._properties.setdefault( + optcls._RESOURCE_NAME, {} # type: ignore + ) return options # No matching source format found. @@ -799,6 +805,13 @@ def schema(self): prop = self._properties.get("schema", {}) return [SchemaField.from_api_repr(field) for field in prop.get("fields", [])] + @schema.setter + def schema(self, value): + prop = value + if value is not None: + prop = {"fields": [field.to_api_repr() for field in value]} + self._properties["schema"] = prop + @property def connection_id(self): """Optional[str]: [Experimental] ID of a BigQuery Connection API @@ -816,13 +829,6 @@ def connection_id(self): def connection_id(self, value): self._properties["connectionId"] = value - @schema.setter - def schema(self, value): - prop = value - if value is not None: - prop = {"fields": [field.to_api_repr() for field in value]} - self._properties["schema"] = prop - @property def avro_options(self) -> Optional[AvroOptions]: """Additional properties to set if ``sourceFormat`` is set to AVRO. diff --git a/google/cloud/bigquery/job/base.py b/google/cloud/bigquery/job/base.py index 88d6bec14..97acab5d2 100644 --- a/google/cloud/bigquery/job/base.py +++ b/google/cloud/bigquery/job/base.py @@ -696,7 +696,7 @@ def done( self.reload(retry=retry, timeout=timeout) return self.state == _DONE_STATE - def result( + def result( # type: ignore # (signature complaint) self, retry: "retries.Retry" = DEFAULT_RETRY, timeout: float = None ) -> "_AsyncJob": """Start the job and wait for it to complete and get the result. @@ -921,7 +921,7 @@ def from_api_repr(cls, resource: dict) -> "_JobConfig": # cls is one of the job config subclasses that provides the job_type argument to # this base class on instantiation, thus missing-parameter warning is a false # positive here. - job_config = cls() # pytype: disable=missing-parameter + job_config = cls() # type: ignore # pytype: disable=missing-parameter job_config._properties = resource return job_config diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 942c85fc3..36e388238 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -56,9 +56,9 @@ if typing.TYPE_CHECKING: # pragma: NO COVER # Assumption: type checks are only used by library developers and CI environments # that have all optional dependencies installed, thus no conditional imports. - import pandas - import geopandas - import pyarrow + import pandas # type: ignore + import geopandas # type: ignore + import pyarrow # type: ignore from google.api_core import retry as retries from google.cloud import bigquery_storage from google.cloud.bigquery.client import Client @@ -144,7 +144,7 @@ def from_api_repr(cls, stats: Dict[str, str]) -> "DmlStats": args = ( int(stats.get(api_field, default_val)) - for api_field, default_val in zip(api_fields, cls.__new__.__defaults__) + for api_field, default_val in zip(api_fields, cls.__new__.__defaults__) # type: ignore ) return cls(*args) @@ -161,7 +161,7 @@ def __init__( statement_byte_budget: Optional[int] = None, key_result_statement: Optional[KeyResultStatementKind] = None, ): - self._properties = {} + self._properties: Dict[str, Any] = {} self.statement_timeout_ms = statement_timeout_ms self.statement_byte_budget = statement_byte_budget self.key_result_statement = key_result_statement @@ -193,9 +193,8 @@ def statement_timeout_ms(self) -> Union[int, None]: @statement_timeout_ms.setter def statement_timeout_ms(self, value: Union[int, None]): - if value is not None: - value = str(value) - self._properties["statementTimeoutMs"] = value + new_value = None if value is None else str(value) + self._properties["statementTimeoutMs"] = new_value @property def statement_byte_budget(self) -> Union[int, None]: @@ -207,9 +206,8 @@ def statement_byte_budget(self) -> Union[int, None]: @statement_byte_budget.setter def statement_byte_budget(self, value: Union[int, None]): - if value is not None: - value = str(value) - self._properties["statementByteBudget"] = value + new_value = None if value is None else str(value) + self._properties["statementByteBudget"] = new_value @property def key_result_statement(self) -> Union[KeyResultStatementKind, None]: @@ -666,9 +664,8 @@ def script_options(self) -> ScriptOptions: @script_options.setter def script_options(self, value: Union[ScriptOptions, None]): - if value is not None: - value = value.to_api_repr() - self._set_sub_prop("scriptOptions", value) + new_value = None if value is None else value.to_api_repr() + self._set_sub_prop("scriptOptions", new_value) def to_api_repr(self) -> dict: """Build an API representation of the query job config. @@ -1330,7 +1327,7 @@ def _done_or_raise(self, retry=DEFAULT_RETRY, timeout=None): except exceptions.GoogleAPIError as exc: self.set_exception(exc) - def result( + def result( # type: ignore # (complaints about the overloaded signature) self, page_size: int = None, max_results: int = None, @@ -1400,7 +1397,7 @@ def result( retry_do_query = getattr(self, "_retry_do_query", None) if retry_do_query is not None: if job_retry is DEFAULT_JOB_RETRY: - job_retry = self._job_retry + job_retry = self._job_retry # type: ignore else: if job_retry is not None and job_retry is not DEFAULT_JOB_RETRY: raise TypeError( @@ -1451,7 +1448,7 @@ def do_get_result(): except exceptions.GoogleAPICallError as exc: exc.message += self._format_for_exception(self.query, self.job_id) - exc.query_job = self + exc.query_job = self # type: ignore raise except requests.exceptions.Timeout as exc: raise concurrent.futures.TimeoutError from exc diff --git a/google/cloud/bigquery/magics/line_arg_parser/lexer.py b/google/cloud/bigquery/magics/line_arg_parser/lexer.py index cd809c389..71b287d01 100644 --- a/google/cloud/bigquery/magics/line_arg_parser/lexer.py +++ b/google/cloud/bigquery/magics/line_arg_parser/lexer.py @@ -98,7 +98,7 @@ def _generate_next_value_(name, start, count, last_values): return name -TokenType = AutoStrEnum( # pytype: disable=wrong-arg-types +TokenType = AutoStrEnum( # type: ignore # pytype: disable=wrong-arg-types "TokenType", [ (name, enum.auto()) diff --git a/google/cloud/bigquery/magics/magics.py b/google/cloud/bigquery/magics/magics.py index ec0430518..1d8d8ed30 100644 --- a/google/cloud/bigquery/magics/magics.py +++ b/google/cloud/bigquery/magics/magics.py @@ -90,16 +90,16 @@ from concurrent import futures try: - import IPython - from IPython import display - from IPython.core import magic_arguments + import IPython # type: ignore + from IPython import display # type: ignore + from IPython.core import magic_arguments # type: ignore except ImportError: # pragma: NO COVER raise ImportError("This module can only be loaded in IPython.") from google.api_core import client_info from google.api_core import client_options from google.api_core.exceptions import NotFound -import google.auth +import google.auth # type: ignore from google.cloud import bigquery import google.cloud.bigquery.dataset from google.cloud.bigquery.dbapi import _helpers diff --git a/google/cloud/bigquery/model.py b/google/cloud/bigquery/model.py index 2d3f6660f..cdb411e08 100644 --- a/google/cloud/bigquery/model.py +++ b/google/cloud/bigquery/model.py @@ -20,8 +20,8 @@ from google.protobuf import json_format -import google.cloud._helpers -from google.api_core import datetime_helpers +import google.cloud._helpers # type: ignore +from google.api_core import datetime_helpers # type: ignore from google.cloud.bigquery import _helpers from google.cloud.bigquery_v2 import types from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration diff --git a/google/cloud/bigquery/opentelemetry_tracing.py b/google/cloud/bigquery/opentelemetry_tracing.py index b1a1027d2..748f2136d 100644 --- a/google/cloud/bigquery/opentelemetry_tracing.py +++ b/google/cloud/bigquery/opentelemetry_tracing.py @@ -14,7 +14,7 @@ import logging from contextlib import contextmanager -from google.api_core.exceptions import GoogleAPICallError +from google.api_core.exceptions import GoogleAPICallError # type: ignore logger = logging.getLogger(__name__) try: diff --git a/google/cloud/bigquery/query.py b/google/cloud/bigquery/query.py index 708f5f47b..637be62be 100644 --- a/google/cloud/bigquery/query.py +++ b/google/cloud/bigquery/query.py @@ -367,14 +367,14 @@ class _AbstractQueryParameter(object): """ @classmethod - def from_api_repr(cls, resource: dict) -> "ScalarQueryParameter": + def from_api_repr(cls, resource: dict) -> "_AbstractQueryParameter": """Factory: construct parameter from JSON resource. Args: resource (Dict): JSON mapping of parameter Returns: - google.cloud.bigquery.query.ScalarQueryParameter + A new instance of _AbstractQueryParameter subclass. """ raise NotImplementedError @@ -471,7 +471,7 @@ def to_api_repr(self) -> dict: converter = _SCALAR_VALUE_TO_JSON_PARAM.get(self.type_) if converter is not None: value = converter(value) - resource = { + resource: Dict[str, Any] = { "parameterType": {"type": self.type_}, "parameterValue": {"value": value}, } @@ -734,7 +734,7 @@ def from_api_repr(cls, resource: dict) -> "StructQueryParameter": struct_values = resource["parameterValue"]["structValues"] for key, value in struct_values.items(): type_ = types[key] - converted = None + converted: Optional[Union[ArrayQueryParameter, StructQueryParameter]] = None if type_ == "STRUCT": struct_resource = { "name": key, diff --git a/google/cloud/bigquery/retry.py b/google/cloud/bigquery/retry.py index 8a86973cd..254b26608 100644 --- a/google/cloud/bigquery/retry.py +++ b/google/cloud/bigquery/retry.py @@ -14,7 +14,7 @@ from google.api_core import exceptions from google.api_core import retry -from google.auth import exceptions as auth_exceptions +from google.auth import exceptions as auth_exceptions # type: ignore import requests.exceptions diff --git a/google/cloud/bigquery/routine/routine.py b/google/cloud/bigquery/routine/routine.py index a776212c3..a66434300 100644 --- a/google/cloud/bigquery/routine/routine.py +++ b/google/cloud/bigquery/routine/routine.py @@ -18,7 +18,7 @@ from google.protobuf import json_format -import google.cloud._helpers +import google.cloud._helpers # type: ignore from google.cloud.bigquery import _helpers import google.cloud.bigquery_v2.types from google.cloud.bigquery_v2.types import StandardSqlTableType diff --git a/google/cloud/bigquery/schema.py b/google/cloud/bigquery/schema.py index 225942234..2af61b672 100644 --- a/google/cloud/bigquery/schema.py +++ b/google/cloud/bigquery/schema.py @@ -16,7 +16,7 @@ import collections import enum -from typing import Iterable, Union +from typing import Any, Dict, Iterable, Union from google.cloud.bigquery_v2 import types @@ -106,7 +106,7 @@ def __init__( scale: Union[int, _DefaultSentinel] = _DEFAULT_VALUE, max_length: Union[int, _DefaultSentinel] = _DEFAULT_VALUE, ): - self._properties = { + self._properties: Dict[str, Any] = { "name": name, "type": field_type, } diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 608218fdc..60c8593c7 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -21,37 +21,37 @@ import functools import operator import typing -from typing import Any, Dict, Iterable, Iterator, Optional, Tuple +from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union import warnings try: - import pandas + import pandas # type: ignore except ImportError: # pragma: NO COVER pandas = None try: - import geopandas + import geopandas # type: ignore except ImportError: geopandas = None else: _COORDINATE_REFERENCE_SYSTEM = "EPSG:4326" try: - import shapely.geos + import shapely.geos # type: ignore except ImportError: shapely = None else: _read_wkt = shapely.geos.WKTReader(shapely.geos.lgeos).read try: - import pyarrow + import pyarrow # type: ignore except ImportError: # pragma: NO COVER pyarrow = None import google.api_core.exceptions from google.api_core.page_iterator import HTTPIterator -import google.cloud._helpers +import google.cloud._helpers # type: ignore from google.cloud.bigquery import _helpers from google.cloud.bigquery import _pandas_helpers from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError @@ -130,7 +130,7 @@ def _view_use_legacy_sql_getter(table): class _TableBase: """Base class for Table-related classes with common functionality.""" - _PROPERTY_TO_API_FIELD = { + _PROPERTY_TO_API_FIELD: Dict[str, Union[str, List[str]]] = { "dataset_id": ["tableReference", "datasetId"], "project": ["tableReference", "projectId"], "table_id": ["tableReference", "tableId"], @@ -807,7 +807,7 @@ def view_query(self): view_use_legacy_sql = property(_view_use_legacy_sql_getter) - @view_use_legacy_sql.setter + @view_use_legacy_sql.setter # type: ignore # (redefinition from above) def view_use_legacy_sql(self, value): if not isinstance(value, bool): raise ValueError("Pass a boolean") @@ -1746,7 +1746,7 @@ def to_arrow( progress_bar.close() finally: if owns_bqstorage_client: - bqstorage_client._transport.grpc_channel.close() + bqstorage_client._transport.grpc_channel.close() # type: ignore if record_batches and bqstorage_client is not None: return pyarrow.Table.from_batches(record_batches) @@ -1763,7 +1763,7 @@ def to_dataframe_iterable( self, bqstorage_client: "bigquery_storage.BigQueryReadClient" = None, dtypes: Dict[str, Any] = None, - max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, + max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore ) -> "pandas.DataFrame": """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -2307,8 +2307,6 @@ def __repr__(self): key_vals = ["{}={}".format(key, val) for key, val in self._key()] return "PartitionRange({})".format(", ".join(key_vals)) - __hash__ = None - class RangePartitioning(object): """Range-based partitioning configuration for a table. @@ -2387,8 +2385,6 @@ def __repr__(self): key_vals = ["{}={}".format(key, repr(val)) for key, val in self._key()] return "RangePartitioning({})".format(", ".join(key_vals)) - __hash__ = None - class TimePartitioningType(object): """Specifies the type of time partitioning to perform.""" @@ -2657,7 +2653,7 @@ def _rows_page_start(iterator, page, response): # pylint: enable=unused-argument -def _table_arg_to_table_ref(value, default_project=None): +def _table_arg_to_table_ref(value, default_project=None) -> TableReference: """Helper to convert a string or Table to TableReference. This function keeps TableReference and other kinds of objects unchanged. @@ -2669,7 +2665,7 @@ def _table_arg_to_table_ref(value, default_project=None): return value -def _table_arg_to_table(value, default_project=None): +def _table_arg_to_table(value, default_project=None) -> Table: """Helper to convert a string or TableReference to a Table. This function keeps Table and other kinds of objects unchanged. diff --git a/noxfile.py b/noxfile.py index 1879a5cd8..505911861 100644 --- a/noxfile.py +++ b/noxfile.py @@ -22,6 +22,7 @@ import nox +MYPY_VERSION = "mypy==0.910" PYTYPE_VERSION = "pytype==2021.4.9" BLACK_VERSION = "black==19.10b0" BLACK_PATHS = ("docs", "google", "samples", "tests", "noxfile.py", "setup.py") @@ -41,6 +42,7 @@ "lint", "lint_setup_py", "blacken", + "mypy", "pytype", "docs", ] @@ -113,9 +115,24 @@ def unit_noextras(session): default(session, install_extras=False) +@nox.session(python=DEFAULT_PYTHON_VERSION) +def mypy(session): + """Run type checks with mypy.""" + session.install("-e", ".[all]") + session.install("ipython") + session.install(MYPY_VERSION) + + # Just install the dependencies' type info directly, since "mypy --install-types" + # might require an additional pass. + session.install( + "types-protobuf", "types-python-dateutil", "types-requests", "types-setuptools", + ) + session.run("mypy", "google/cloud") + + @nox.session(python=DEFAULT_PYTHON_VERSION) def pytype(session): - """Run type checks.""" + """Run type checks with pytype.""" # An indirect dependecy attrs==21.1.0 breaks the check, and installing a less # recent version avoids the error until a possibly better fix is found. # https://github.com/googleapis/python-bigquery/issues/655 diff --git a/tests/unit/test_opentelemetry_tracing.py b/tests/unit/test_opentelemetry_tracing.py index cc1ca7903..3021a3dbf 100644 --- a/tests/unit/test_opentelemetry_tracing.py +++ b/tests/unit/test_opentelemetry_tracing.py @@ -51,9 +51,16 @@ def setup(): memory_exporter = InMemorySpanExporter() span_processor = SimpleSpanProcessor(memory_exporter) tracer_provider.add_span_processor(span_processor) - trace.set_tracer_provider(tracer_provider) + + # OpenTelemetry API >= 0.12b0 does not allow overriding the tracer once + # initialized, thus directly override (and then restore) the internal global var. + orig_trace_provider = trace._TRACER_PROVIDER + trace._TRACER_PROVIDER = tracer_provider + yield memory_exporter + trace._TRACER_PROVIDER = orig_trace_provider + @pytest.mark.skipif(opentelemetry is None, reason="Require `opentelemetry`") def test_opentelemetry_not_installed(setup, monkeypatch): From 9a5a888bad569c8c1d8d72994d428ed92fe5ac47 Mon Sep 17 00:00:00 2001 From: "gcf-owl-bot[bot]" <78513119+gcf-owl-bot[bot]@users.noreply.github.com> Date: Wed, 10 Nov 2021 11:50:58 -0600 Subject: [PATCH 03/10] chore: use gapic-generator-python 0.56.2 (#1056) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore: update Java and Python dependencies PiperOrigin-RevId: 408420890 Source-Link: https://github.com/googleapis/googleapis/commit/2921f9fb3bfbd16f6b2da0104373e2b47a80a65e Source-Link: https://github.com/googleapis/googleapis-gen/commit/6598ca8cbbf5226733a099c4506518a5af6ff74c Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiNjU5OGNhOGNiYmY1MjI2NzMzYTA5OWM0NTA2NTE4YTVhZjZmZjc0YyJ9 * 🦉 Updates from OwlBot See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot --- google/cloud/bigquery_v2/types/model.py | 8 ++++++++ google/cloud/bigquery_v2/types/standard_sql.py | 2 ++ 2 files changed, 10 insertions(+) diff --git a/google/cloud/bigquery_v2/types/model.py b/google/cloud/bigquery_v2/types/model.py index a56b21491..440bc0805 100644 --- a/google/cloud/bigquery_v2/types/model.py +++ b/google/cloud/bigquery_v2/types/model.py @@ -573,9 +573,11 @@ class FeatureValue(proto.Message): numerical_value (google.protobuf.wrappers_pb2.DoubleValue): The numerical feature value. This is the centroid value for this feature. + This field is a member of `oneof`_ ``value``. categorical_value (google.cloud.bigquery_v2.types.Model.ClusteringMetrics.Cluster.FeatureValue.CategoricalValue): The categorical feature value. + This field is a member of `oneof`_ ``value``. """ @@ -804,24 +806,30 @@ class EvaluationMetrics(proto.Message): regression_metrics (google.cloud.bigquery_v2.types.Model.RegressionMetrics): Populated for regression models and explicit feedback type matrix factorization models. + This field is a member of `oneof`_ ``metrics``. binary_classification_metrics (google.cloud.bigquery_v2.types.Model.BinaryClassificationMetrics): Populated for binary classification/classifier models. + This field is a member of `oneof`_ ``metrics``. multi_class_classification_metrics (google.cloud.bigquery_v2.types.Model.MultiClassClassificationMetrics): Populated for multi-class classification/classifier models. + This field is a member of `oneof`_ ``metrics``. clustering_metrics (google.cloud.bigquery_v2.types.Model.ClusteringMetrics): Populated for clustering models. + This field is a member of `oneof`_ ``metrics``. ranking_metrics (google.cloud.bigquery_v2.types.Model.RankingMetrics): Populated for implicit feedback type matrix factorization models. + This field is a member of `oneof`_ ``metrics``. arima_forecasting_metrics (google.cloud.bigquery_v2.types.Model.ArimaForecastingMetrics): Populated for ARIMA models. + This field is a member of `oneof`_ ``metrics``. """ diff --git a/google/cloud/bigquery_v2/types/standard_sql.py b/google/cloud/bigquery_v2/types/standard_sql.py index d6c133634..e10619482 100644 --- a/google/cloud/bigquery_v2/types/standard_sql.py +++ b/google/cloud/bigquery_v2/types/standard_sql.py @@ -49,10 +49,12 @@ class StandardSqlDataType(proto.Message): "INT64", "DATE", "ARRAY"). array_element_type (google.cloud.bigquery_v2.types.StandardSqlDataType): The type of the array's elements, if type_kind = "ARRAY". + This field is a member of `oneof`_ ``sub_type``. struct_type (google.cloud.bigquery_v2.types.StandardSqlStructType): The fields of this struct, in order, if type_kind = "STRUCT". + This field is a member of `oneof`_ ``sub_type``. """ From 942930e4e8972c8a8161e94ac01633c5754e60f6 Mon Sep 17 00:00:00 2001 From: Dan Lee <71398022+dandhlee@users.noreply.github.com> Date: Tue, 16 Nov 2021 03:23:22 -0500 Subject: [PATCH 04/10] chore: update doc links from googleapis.dev to cloud.google.com (#1062) --- .repo-metadata.json | 2 +- README.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.repo-metadata.json b/.repo-metadata.json index 124b40eb9..48cc05719 100644 --- a/.repo-metadata.json +++ b/.repo-metadata.json @@ -2,7 +2,7 @@ "name": "bigquery", "name_pretty": "Google Cloud BigQuery", "product_documentation": "https://cloud.google.com/bigquery", - "client_documentation": "https://googleapis.dev/python/bigquery/latest", + "client_documentation": "https://cloud.google.com/python/docs/reference/bigquery/latest", "issue_tracker": "https://issuetracker.google.com/savedsearches/559654", "release_level": "ga", "language": "python", diff --git a/README.rst b/README.rst index d0ad059a2..7b890e87e 100644 --- a/README.rst +++ b/README.rst @@ -18,7 +18,7 @@ processing power of Google's infrastructure. .. |versions| image:: https://img.shields.io/pypi/pyversions/google-cloud-bigquery.svg :target: https://pypi.org/project/google-cloud-bigquery/ .. _BigQuery: https://cloud.google.com/bigquery/what-is-bigquery -.. _Client Library Documentation: https://googleapis.dev/python/bigquery/latest +.. _Client Library Documentation: https://cloud.google.com/python/docs/reference/bigquery/latest .. _Product Documentation: https://cloud.google.com/bigquery/docs/reference/v2/ Quick Start From 705205430527f4c736fa4ad94b58e8cbb625fc11 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 16 Nov 2021 11:20:07 -0600 Subject: [PATCH 05/10] chore: revert update doc links from googleapis.dev to cloud.google.com (#1063) This reverts commit 942930e4e8972c8a8161e94ac01633c5754e60f6. --- .repo-metadata.json | 2 +- README.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.repo-metadata.json b/.repo-metadata.json index 48cc05719..124b40eb9 100644 --- a/.repo-metadata.json +++ b/.repo-metadata.json @@ -2,7 +2,7 @@ "name": "bigquery", "name_pretty": "Google Cloud BigQuery", "product_documentation": "https://cloud.google.com/bigquery", - "client_documentation": "https://cloud.google.com/python/docs/reference/bigquery/latest", + "client_documentation": "https://googleapis.dev/python/bigquery/latest", "issue_tracker": "https://issuetracker.google.com/savedsearches/559654", "release_level": "ga", "language": "python", diff --git a/README.rst b/README.rst index 7b890e87e..d0ad059a2 100644 --- a/README.rst +++ b/README.rst @@ -18,7 +18,7 @@ processing power of Google's infrastructure. .. |versions| image:: https://img.shields.io/pypi/pyversions/google-cloud-bigquery.svg :target: https://pypi.org/project/google-cloud-bigquery/ .. _BigQuery: https://cloud.google.com/bigquery/what-is-bigquery -.. _Client Library Documentation: https://cloud.google.com/python/docs/reference/bigquery/latest +.. _Client Library Documentation: https://googleapis.dev/python/bigquery/latest .. _Product Documentation: https://cloud.google.com/bigquery/docs/reference/v2/ Quick Start From 3a681e046819df18118aa0b2b5733416d004c9b3 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Tue, 16 Nov 2021 22:57:51 +0100 Subject: [PATCH 06/10] feat: allow cell magic body to be a $variable (#1053) * feat: allow cell magic body to be a $variable * Fix missing indefinitive article in error msg * Adjust test assertion to error message change * Refactor logic for extracting query variable * Explicitly warn about missing query variable name * Thest the query "variable" is not identifier case --- google/cloud/bigquery/magics/magics.py | 23 ++++ tests/unit/test_magics.py | 139 ++++++++++++++++++++++++- 2 files changed, 161 insertions(+), 1 deletion(-) diff --git a/google/cloud/bigquery/magics/magics.py b/google/cloud/bigquery/magics/magics.py index 1d8d8ed30..5af0a3b51 100644 --- a/google/cloud/bigquery/magics/magics.py +++ b/google/cloud/bigquery/magics/magics.py @@ -596,6 +596,29 @@ def _cell_magic(line, query): _handle_error(error, args.destination_var) return + # Check if query is given as a reference to a variable. + if query.startswith("$"): + query_var_name = query[1:] + + if not query_var_name: + missing_msg = 'Missing query variable name, empty "$" is not allowed.' + raise NameError(missing_msg) + + if query_var_name.isidentifier(): + ip = IPython.get_ipython() + query = ip.user_ns.get(query_var_name, ip) # ip serves as a sentinel + + if query is ip: + raise NameError( + f"Unknown query, variable {query_var_name} does not exist." + ) + else: + if not isinstance(query, (str, bytes)): + raise TypeError( + f"Query variable {query_var_name} must be a string " + "or a bytes-like value." + ) + # Any query that does not contain whitespace (aside from leading and trailing whitespace) # is assumed to be a table id if not re.search(r"\s", query): diff --git a/tests/unit/test_magics.py b/tests/unit/test_magics.py index 36cbf4993..e18d04d64 100644 --- a/tests/unit/test_magics.py +++ b/tests/unit/test_magics.py @@ -584,7 +584,7 @@ def test_bigquery_magic_does_not_clear_display_in_verbose_mode(): @pytest.mark.usefixtures("ipython_interactive") -def test_bigquery_magic_clears_display_in_verbose_mode(): +def test_bigquery_magic_clears_display_in_non_verbose_mode(): ip = IPython.get_ipython() ip.extension_manager.load_extension("google.cloud.bigquery") magics.context.credentials = mock.create_autospec( @@ -1710,6 +1710,143 @@ def test_bigquery_magic_with_improperly_formatted_params(): ip.run_cell_magic("bigquery", "--params {17}", sql) +@pytest.mark.parametrize( + "raw_sql", ("SELECT answer AS 42", " \t SELECT answer AS 42 \t ") +) +@pytest.mark.usefixtures("ipython_interactive") +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_bigquery_magic_valid_query_in_existing_variable(ipython_ns_cleanup, raw_sql): + ip = IPython.get_ipython() + ip.extension_manager.load_extension("google.cloud.bigquery") + magics.context.credentials = mock.create_autospec( + google.auth.credentials.Credentials, instance=True + ) + + ipython_ns_cleanup.append((ip, "custom_query")) + ipython_ns_cleanup.append((ip, "query_results_df")) + + run_query_patch = mock.patch( + "google.cloud.bigquery.magics.magics._run_query", autospec=True + ) + query_job_mock = mock.create_autospec( + google.cloud.bigquery.job.QueryJob, instance=True + ) + mock_result = pandas.DataFrame([42], columns=["answer"]) + query_job_mock.to_dataframe.return_value = mock_result + + ip.user_ns["custom_query"] = raw_sql + cell_body = "$custom_query" # Referring to an existing variable name (custom_query) + assert "query_results_df" not in ip.user_ns + + with run_query_patch as run_query_mock: + run_query_mock.return_value = query_job_mock + + ip.run_cell_magic("bigquery", "query_results_df", cell_body) + + run_query_mock.assert_called_once_with(mock.ANY, raw_sql, mock.ANY) + + assert "query_results_df" in ip.user_ns # verify that the variable exists + df = ip.user_ns["query_results_df"] + assert len(df) == len(mock_result) # verify row count + assert list(df) == list(mock_result) # verify column names + assert list(df["answer"]) == [42] + + +@pytest.mark.usefixtures("ipython_interactive") +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_bigquery_magic_nonexisting_query_variable(): + ip = IPython.get_ipython() + ip.extension_manager.load_extension("google.cloud.bigquery") + magics.context.credentials = mock.create_autospec( + google.auth.credentials.Credentials, instance=True + ) + + run_query_patch = mock.patch( + "google.cloud.bigquery.magics.magics._run_query", autospec=True + ) + + ip.user_ns.pop("custom_query", None) # Make sure the variable does NOT exist. + cell_body = "$custom_query" # Referring to a non-existing variable name. + + with pytest.raises( + NameError, match=r".*custom_query does not exist.*" + ), run_query_patch as run_query_mock: + ip.run_cell_magic("bigquery", "", cell_body) + + run_query_mock.assert_not_called() + + +@pytest.mark.usefixtures("ipython_interactive") +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_bigquery_magic_empty_query_variable_name(): + ip = IPython.get_ipython() + ip.extension_manager.load_extension("google.cloud.bigquery") + magics.context.credentials = mock.create_autospec( + google.auth.credentials.Credentials, instance=True + ) + + run_query_patch = mock.patch( + "google.cloud.bigquery.magics.magics._run_query", autospec=True + ) + cell_body = "$" # Not referring to any variable (name omitted). + + with pytest.raises( + NameError, match=r"(?i).*missing query variable name.*" + ), run_query_patch as run_query_mock: + ip.run_cell_magic("bigquery", "", cell_body) + + run_query_mock.assert_not_called() + + +@pytest.mark.usefixtures("ipython_interactive") +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_bigquery_magic_query_variable_non_string(ipython_ns_cleanup): + ip = IPython.get_ipython() + ip.extension_manager.load_extension("google.cloud.bigquery") + magics.context.credentials = mock.create_autospec( + google.auth.credentials.Credentials, instance=True + ) + + run_query_patch = mock.patch( + "google.cloud.bigquery.magics.magics._run_query", autospec=True + ) + + ipython_ns_cleanup.append((ip, "custom_query")) + + ip.user_ns["custom_query"] = object() + cell_body = "$custom_query" # Referring to a non-string variable. + + with pytest.raises( + TypeError, match=r".*must be a string or a bytes-like.*" + ), run_query_patch as run_query_mock: + ip.run_cell_magic("bigquery", "", cell_body) + + run_query_mock.assert_not_called() + + +@pytest.mark.usefixtures("ipython_interactive") +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_bigquery_magic_query_variable_not_identifier(): + ip = IPython.get_ipython() + ip.extension_manager.load_extension("google.cloud.bigquery") + magics.context.credentials = mock.create_autospec( + google.auth.credentials.Credentials, instance=True + ) + + cell_body = "$123foo" # 123foo is not valid Python identifier + + with io.capture_output() as captured_io: + ip.run_cell_magic("bigquery", "", cell_body) + + # If "$" prefixes a string that is not a Python identifier, we do not treat such + # cell_body as a variable reference and just treat is as any other cell body input. + # If at the same time the cell body does not contain any whitespace, it is + # considered a table name, thus we expect an error that the table ID is not valid. + output = captured_io.stderr + assert "ERROR:" in output + assert "must be a fully-qualified ID" in output + + @pytest.mark.usefixtures("ipython_interactive") @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") def test_bigquery_magic_with_invalid_multiple_option_values(): From 1b5dc5ce1c6419233d9564aa32edbdee523f0f10 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 18 Nov 2021 08:22:57 +0100 Subject: [PATCH 07/10] cleanup: silence non-relevant system test warnings (#1068) --- tests/system/test_pandas.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/tests/system/test_pandas.py b/tests/system/test_pandas.py index 1f43a369a..1541dd3b9 100644 --- a/tests/system/test_pandas.py +++ b/tests/system/test_pandas.py @@ -20,6 +20,7 @@ import json import io import operator +import warnings import google.api_core.retry import pkg_resources @@ -976,9 +977,17 @@ def test_to_geodataframe(bigquery_client, dataset_id): assert df["geog"][2] == wkt.loads("point(0 0)") assert isinstance(df, geopandas.GeoDataFrame) assert isinstance(df["geog"], geopandas.GeoSeries) - assert df.area[0] == 0.5 - assert pandas.isna(df.area[1]) - assert df.area[2] == 0.0 + + with warnings.catch_warnings(): + # Computing the area on a GeoDataFrame that uses a geographic Coordinate + # Reference System (CRS) produces a warning that we are not interested in. + # We do not mind if the computed area is incorrect with respect to the + # GeoDataFrame data, as long as it matches the expected "incorrect" value. + warnings.filterwarnings("ignore", category=UserWarning) + assert df.area[0] == 0.5 + assert pandas.isna(df.area[1]) + assert df.area[2] == 0.0 + assert df.crs.srs == "EPSG:4326" assert df.crs.name == "WGS 84" assert df.geog.crs.srs == "EPSG:4326" From 21cd71022d60c32104f8f90ee2ca445fbb43f7f3 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 19 Nov 2021 22:40:42 +0000 Subject: [PATCH 08/10] feat: promote `RowIterator.to_arrow_iterable` to public method (#1073) * feat: promote `to_arrow_iterable` to public method * use correct version number * Update google/cloud/bigquery/table.py Co-authored-by: Tim Swast --- google/cloud/bigquery/_pandas_helpers.py | 8 +- google/cloud/bigquery/table.py | 75 +++++++- tests/unit/test_table.py | 218 +++++++++++++++++++++++ 3 files changed, 297 insertions(+), 4 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index de6356c2a..263a1a9cf 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -838,7 +838,12 @@ def _download_table_bqstorage( def download_arrow_bqstorage( - project_id, table, bqstorage_client, preserve_order=False, selected_fields=None, + project_id, + table, + bqstorage_client, + preserve_order=False, + selected_fields=None, + max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, ): return _download_table_bqstorage( project_id, @@ -847,6 +852,7 @@ def download_arrow_bqstorage( preserve_order=preserve_order, selected_fields=selected_fields, page_to_item=_bqstorage_page_to_arrow, + max_queue_size=max_queue_size, ) diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 60c8593c7..a0696f83f 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -1629,8 +1629,49 @@ def _to_page_iterable( ) yield from result_pages - def _to_arrow_iterable(self, bqstorage_client=None): - """Create an iterable of arrow RecordBatches, to process the table as a stream.""" + def to_arrow_iterable( + self, + bqstorage_client: "bigquery_storage.BigQueryReadClient" = None, + max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore + ) -> Iterator["pyarrow.RecordBatch"]: + """[Beta] Create an iterable of class:`pyarrow.RecordBatch`, to process the table as a stream. + + Args: + bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]): + A BigQuery Storage API client. If supplied, use the faster + BigQuery Storage API to fetch rows from BigQuery. + + This method requires the ``pyarrow`` and + ``google-cloud-bigquery-storage`` libraries. + + This method only exposes a subset of the capabilities of the + BigQuery Storage API. For full access to all features + (projections, filters, snapshots) use the Storage API directly. + + max_queue_size (Optional[int]): + The maximum number of result pages to hold in the internal queue when + streaming query results over the BigQuery Storage API. Ignored if + Storage API is not used. + + By default, the max queue size is set to the number of BQ Storage streams + created by the server. If ``max_queue_size`` is :data:`None`, the queue + size is infinite. + + Returns: + pyarrow.RecordBatch: + A generator of :class:`~pyarrow.RecordBatch`. + + Raises: + ValueError: + If the :mod:`pyarrow` library cannot be imported. + + .. versionadded:: 2.31.0 + """ + if pyarrow is None: + raise ValueError(_NO_PYARROW_ERROR) + + self._maybe_warn_max_results(bqstorage_client) + bqstorage_download = functools.partial( _pandas_helpers.download_arrow_bqstorage, self._project, @@ -1638,6 +1679,7 @@ def _to_arrow_iterable(self, bqstorage_client=None): bqstorage_client, preserve_order=self._preserve_order, selected_fields=self._selected_fields, + max_queue_size=max_queue_size, ) tabledata_list_download = functools.partial( _pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema @@ -1729,7 +1771,7 @@ def to_arrow( ) record_batches = [] - for record_batch in self._to_arrow_iterable( + for record_batch in self.to_arrow_iterable( bqstorage_client=bqstorage_client ): record_batches.append(record_batch) @@ -2225,6 +2267,33 @@ def to_dataframe_iterable( raise ValueError(_NO_PANDAS_ERROR) return iter((pandas.DataFrame(),)) + def to_arrow_iterable( + self, + bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, + max_queue_size: Optional[int] = None, + ) -> Iterator["pyarrow.RecordBatch"]: + """Create an iterable of pandas DataFrames, to process the table as a stream. + + .. versionadded:: 2.31.0 + + Args: + bqstorage_client: + Ignored. Added for compatibility with RowIterator. + + max_queue_size: + Ignored. Added for compatibility with RowIterator. + + Returns: + An iterator yielding a single empty :class:`~pyarrow.RecordBatch`. + + Raises: + ValueError: + If the :mod:`pyarrow` library cannot be imported. + """ + if pyarrow is None: + raise ValueError(_NO_PYARROW_ERROR) + return iter((pyarrow.record_batch([]),)) + def __iter__(self): return iter(()) diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index 3c68e3c5e..4f45eac3d 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -1840,6 +1840,25 @@ def test_to_arrow(self): self.assertIsInstance(tbl, pyarrow.Table) self.assertEqual(tbl.num_rows, 0) + @mock.patch("google.cloud.bigquery.table.pyarrow", new=None) + def test_to_arrow_iterable_error_if_pyarrow_is_none(self): + row_iterator = self._make_one() + with self.assertRaises(ValueError): + row_iterator.to_arrow_iterable() + + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_to_arrow_iterable(self): + row_iterator = self._make_one() + arrow_iter = row_iterator.to_arrow_iterable() + + result = list(arrow_iter) + + self.assertEqual(len(result), 1) + record_batch = result[0] + self.assertIsInstance(record_batch, pyarrow.RecordBatch) + self.assertEqual(record_batch.num_rows, 0) + self.assertEqual(record_batch.num_columns, 0) + @mock.patch("google.cloud.bigquery.table.pandas", new=None) def test_to_dataframe_error_if_pandas_is_none(self): row_iterator = self._make_one() @@ -2151,6 +2170,205 @@ def test__validate_bqstorage_returns_false_w_warning_if_obsolete_version(self): ] assert matching_warnings, "Obsolete dependency warning not raised." + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_to_arrow_iterable(self): + from google.cloud.bigquery.schema import SchemaField + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + SchemaField( + "child", + "RECORD", + mode="REPEATED", + fields=[ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ], + ), + ] + rows = [ + { + "f": [ + {"v": "Bharney Rhubble"}, + {"v": "33"}, + { + "v": [ + {"v": {"f": [{"v": "Whamm-Whamm Rhubble"}, {"v": "3"}]}}, + {"v": {"f": [{"v": "Hoppy"}, {"v": "1"}]}}, + ] + }, + ] + }, + { + "f": [ + {"v": "Wylma Phlyntstone"}, + {"v": "29"}, + { + "v": [ + {"v": {"f": [{"v": "Bepples Phlyntstone"}, {"v": "0"}]}}, + {"v": {"f": [{"v": "Dino"}, {"v": "4"}]}}, + ] + }, + ] + }, + ] + path = "/foo" + api_request = mock.Mock( + side_effect=[ + {"rows": [rows[0]], "pageToken": "NEXTPAGE"}, + {"rows": [rows[1]]}, + ] + ) + row_iterator = self._make_one( + _mock_client(), api_request, path, schema, page_size=1, max_results=5 + ) + + record_batches = row_iterator.to_arrow_iterable() + self.assertIsInstance(record_batches, types.GeneratorType) + record_batches = list(record_batches) + self.assertEqual(len(record_batches), 2) + + # Check the schema. + for record_batch in record_batches: + self.assertIsInstance(record_batch, pyarrow.RecordBatch) + self.assertEqual(record_batch.schema[0].name, "name") + self.assertTrue(pyarrow.types.is_string(record_batch.schema[0].type)) + self.assertEqual(record_batch.schema[1].name, "age") + self.assertTrue(pyarrow.types.is_int64(record_batch.schema[1].type)) + child_field = record_batch.schema[2] + self.assertEqual(child_field.name, "child") + self.assertTrue(pyarrow.types.is_list(child_field.type)) + self.assertTrue(pyarrow.types.is_struct(child_field.type.value_type)) + self.assertEqual(child_field.type.value_type[0].name, "name") + self.assertEqual(child_field.type.value_type[1].name, "age") + + # Check the data. + record_batch_1 = record_batches[0].to_pydict() + names = record_batch_1["name"] + ages = record_batch_1["age"] + children = record_batch_1["child"] + self.assertEqual(names, ["Bharney Rhubble"]) + self.assertEqual(ages, [33]) + self.assertEqual( + children, + [ + [ + {"name": "Whamm-Whamm Rhubble", "age": 3}, + {"name": "Hoppy", "age": 1}, + ], + ], + ) + + record_batch_2 = record_batches[1].to_pydict() + names = record_batch_2["name"] + ages = record_batch_2["age"] + children = record_batch_2["child"] + self.assertEqual(names, ["Wylma Phlyntstone"]) + self.assertEqual(ages, [29]) + self.assertEqual( + children, + [[{"name": "Bepples Phlyntstone", "age": 0}, {"name": "Dino", "age": 4}]], + ) + + @mock.patch("google.cloud.bigquery.table.pyarrow", new=None) + def test_to_arrow_iterable_error_if_pyarrow_is_none(self): + from google.cloud.bigquery.schema import SchemaField + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + rows = [ + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + ] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = self._make_one(_mock_client(), api_request, path, schema) + + with pytest.raises(ValueError, match="pyarrow"): + row_iterator.to_arrow_iterable() + + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + @unittest.skipIf( + bigquery_storage is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_to_arrow_iterable_w_bqstorage(self): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + from google.cloud.bigquery_storage_v1 import reader + + bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient) + bqstorage_client._transport = mock.create_autospec( + big_query_read_grpc_transport.BigQueryReadGrpcTransport + ) + streams = [ + # Use two streams we want to check frames are read from each stream. + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, + ] + session = bigquery_storage.types.ReadSession(streams=streams) + arrow_schema = pyarrow.schema( + [ + pyarrow.field("colA", pyarrow.int64()), + # Not alphabetical to test column order. + pyarrow.field("colC", pyarrow.float64()), + pyarrow.field("colB", pyarrow.string()), + ] + ) + session.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes() + bqstorage_client.create_read_session.return_value = session + + mock_rowstream = mock.create_autospec(reader.ReadRowsStream) + bqstorage_client.read_rows.return_value = mock_rowstream + + mock_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_rowstream.rows.return_value = mock_rows + page_items = [ + pyarrow.array([1, -1]), + pyarrow.array([2.0, 4.0]), + pyarrow.array(["abc", "def"]), + ] + + expected_record_batch = pyarrow.RecordBatch.from_arrays( + page_items, schema=arrow_schema + ) + expected_num_record_batches = 3 + + mock_page = mock.create_autospec(reader.ReadRowsPage) + mock_page.to_arrow.return_value = expected_record_batch + mock_pages = (mock_page,) * expected_num_record_batches + type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) + + schema = [ + schema.SchemaField("colA", "INTEGER"), + schema.SchemaField("colC", "FLOAT"), + schema.SchemaField("colB", "STRING"), + ] + + row_iterator = mut.RowIterator( + _mock_client(), + None, # api_request: ignored + None, # path: ignored + schema, + table=mut.TableReference.from_string("proj.dset.tbl"), + selected_fields=schema, + ) + + record_batches = list( + row_iterator.to_arrow_iterable(bqstorage_client=bqstorage_client) + ) + total_record_batches = len(streams) * len(mock_pages) + self.assertEqual(len(record_batches), total_record_batches) + + for record_batch in record_batches: + # Are the record batches return as expected? + self.assertEqual(record_batch, expected_record_batch) + + # Don't close the client if it was passed in. + bqstorage_client._transport.grpc_channel.close.assert_not_called() + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_arrow(self): from google.cloud.bigquery.schema import SchemaField From 3314dfbed62488503dc41b11e403a672fcf71048 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 24 Nov 2021 16:12:10 +0100 Subject: [PATCH 09/10] fix: apply timeout to all resumable upload requests (#1070) * fix: apply timeout to all resumable upload requests * Fix stub in test case * Improve timeout type and other type annotations * Annnotate return type of _do_resumable_upload() --- google/cloud/bigquery/client.py | 186 +++++++++++++++++++------------- tests/unit/test_client.py | 18 +++- 2 files changed, 128 insertions(+), 76 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 3e641e195..a5f3d5419 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -31,9 +31,10 @@ import typing from typing import ( Any, - BinaryIO, Dict, + IO, Iterable, + Mapping, List, Optional, Sequence, @@ -112,10 +113,15 @@ pyarrow = _helpers.PYARROW_VERSIONS.try_import() TimeoutType = Union[float, None] +ResumableTimeoutType = Union[ + None, float, Tuple[float, float] +] # for resumable media methods if typing.TYPE_CHECKING: # pragma: NO COVER # os.PathLike is only subscriptable in Python 3.9+, thus shielding with a condition. PathType = Union[str, bytes, os.PathLike[str], os.PathLike[bytes]] + import pandas # type: ignore + import requests # required by api-core _DEFAULT_CHUNKSIZE = 100 * 1024 * 1024 # 100 MB _MAX_MULTIPART_SIZE = 5 * 1024 * 1024 @@ -2348,7 +2354,7 @@ def load_table_from_uri( def load_table_from_file( self, - file_obj: BinaryIO, + file_obj: IO[bytes], destination: Union[Table, TableReference, TableListItem, str], rewind: bool = False, size: int = None, @@ -2358,7 +2364,7 @@ def load_table_from_file( location: str = None, project: str = None, job_config: LoadJobConfig = None, - timeout: TimeoutType = DEFAULT_TIMEOUT, + timeout: ResumableTimeoutType = DEFAULT_TIMEOUT, ) -> job.LoadJob: """Upload the contents of this table from a file-like object. @@ -2366,42 +2372,42 @@ def load_table_from_file( returns a :class:`~google.cloud.bigquery.job.LoadJob`. Args: - file_obj (file): A file handle opened in binary mode for reading. - destination (Union[ \ - google.cloud.bigquery.table.Table, \ - google.cloud.bigquery.table.TableReference, \ - google.cloud.bigquery.table.TableListItem, \ - str, \ - ]): + file_obj: + A file handle opened in binary mode for reading. + destination: Table into which data is to be loaded. If a string is passed in, this method attempts to create a table reference from a string using :func:`google.cloud.bigquery.table.TableReference.from_string`. Keyword Arguments: - rewind (Optional[bool]): + rewind: If True, seek to the beginning of the file handle before reading the file. - size (Optional[int]): + size: The number of bytes to read from the file handle. If size is ``None`` or large, resumable upload will be used. Otherwise, multipart upload will be used. - num_retries (Optional[int]): Number of upload retries. Defaults to 6. - job_id (Optional[str]): Name of the job. - job_id_prefix (Optional[str]): + num_retries: Number of upload retries. Defaults to 6. + job_id: Name of the job. + job_id_prefix: The user-provided prefix for a randomly generated job ID. This parameter will be ignored if a ``job_id`` is also given. - location (Optional[str]): + location: Location where to run the job. Must match the location of the destination table. - project (Optional[str]): + project: Project ID of the project of where to run the job. Defaults to the client's project. - job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]): + job_config: Extra configuration options for the job. - timeout (Optional[float]): + timeout: The number of seconds to wait for the underlying HTTP transport - before using ``retry``. + before using ``retry``. Depending on the retry strategy, a request + may be repeated several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -2453,7 +2459,7 @@ def load_table_from_file( def load_table_from_dataframe( self, - dataframe, + dataframe: "pandas.DataFrame", destination: Union[Table, TableReference, str], num_retries: int = _DEFAULT_NUM_RETRIES, job_id: str = None, @@ -2462,7 +2468,7 @@ def load_table_from_dataframe( project: str = None, job_config: LoadJobConfig = None, parquet_compression: str = "snappy", - timeout: TimeoutType = DEFAULT_TIMEOUT, + timeout: ResumableTimeoutType = DEFAULT_TIMEOUT, ) -> job.LoadJob: """Upload the contents of a table from a pandas DataFrame. @@ -2481,9 +2487,9 @@ def load_table_from_dataframe( https://github.com/googleapis/python-bigquery/issues/19 Args: - dataframe (pandas.DataFrame): + dataframe: A :class:`~pandas.DataFrame` containing the data to load. - destination (google.cloud.bigquery.table.TableReference): + destination: The destination table to use for loading the data. If it is an existing table, the schema of the :class:`~pandas.DataFrame` must match the schema of the destination table. If the table @@ -2495,19 +2501,19 @@ def load_table_from_dataframe( :func:`google.cloud.bigquery.table.TableReference.from_string`. Keyword Arguments: - num_retries (Optional[int]): Number of upload retries. - job_id (Optional[str]): Name of the job. - job_id_prefix (Optional[str]): + num_retries: Number of upload retries. + job_id: Name of the job. + job_id_prefix: The user-provided prefix for a randomly generated job ID. This parameter will be ignored if a ``job_id`` is also given. - location (Optional[str]): + location: Location where to run the job. Must match the location of the destination table. - project (Optional[str]): + project: Project ID of the project of where to run the job. Defaults to the client's project. - job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]): + job_config: Extra configuration options for the job. To override the default pandas data type conversions, supply @@ -2524,7 +2530,7 @@ def load_table_from_dataframe( :attr:`~google.cloud.bigquery.job.SourceFormat.CSV` and :attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are supported. - parquet_compression (Optional[str]): + parquet_compression: [Beta] The compression method to use if intermittently serializing ``dataframe`` to a parquet file. @@ -2537,9 +2543,13 @@ def load_table_from_dataframe( passed as the ``compression`` argument to the underlying ``DataFrame.to_parquet()`` method. https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet - timeout (Optional[float]): + timeout: The number of seconds to wait for the underlying HTTP transport - before using ``retry``. + before using ``retry``. Depending on the retry strategy, a request may + be repeated several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -2717,7 +2727,7 @@ def load_table_from_json( location: str = None, project: str = None, job_config: LoadJobConfig = None, - timeout: TimeoutType = DEFAULT_TIMEOUT, + timeout: ResumableTimeoutType = DEFAULT_TIMEOUT, ) -> job.LoadJob: """Upload the contents of a table from a JSON string or dict. @@ -2741,36 +2751,35 @@ def load_table_from_json( client = bigquery.Client() client.load_table_from_file(data_as_file, ...) - destination (Union[ \ - google.cloud.bigquery.table.Table, \ - google.cloud.bigquery.table.TableReference, \ - google.cloud.bigquery.table.TableListItem, \ - str, \ - ]): + destination: Table into which data is to be loaded. If a string is passed in, this method attempts to create a table reference from a string using :func:`google.cloud.bigquery.table.TableReference.from_string`. Keyword Arguments: - num_retries (Optional[int]): Number of upload retries. - job_id (Optional[str]): Name of the job. - job_id_prefix (Optional[str]): + num_retries: Number of upload retries. + job_id: Name of the job. + job_id_prefix: The user-provided prefix for a randomly generated job ID. This parameter will be ignored if a ``job_id`` is also given. - location (Optional[str]): + location: Location where to run the job. Must match the location of the destination table. - project (Optional[str]): + project: Project ID of the project of where to run the job. Defaults to the client's project. - job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]): + job_config: Extra configuration options for the job. The ``source_format`` setting is always set to :attr:`~google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON`. - timeout (Optional[float]): + timeout: The number of seconds to wait for the underlying HTTP transport - before using ``retry``. + before using ``retry``. Depending on the retry strategy, a request may + be repeated several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -2819,60 +2828,77 @@ def load_table_from_json( ) def _do_resumable_upload( - self, stream, metadata, num_retries, timeout, project=None - ): + self, + stream: IO[bytes], + metadata: Mapping[str, str], + num_retries: int, + timeout: Optional[ResumableTimeoutType], + project: Optional[str] = None, + ) -> "requests.Response": """Perform a resumable upload. Args: - stream (IO[bytes]): A bytes IO object open for reading. + stream: A bytes IO object open for reading. - metadata (Dict): The metadata associated with the upload. + metadata: The metadata associated with the upload. - num_retries (int): + num_retries: Number of upload retries. (Deprecated: This argument will be removed in a future release.) - timeout (float): + timeout: The number of seconds to wait for the underlying HTTP transport - before using ``retry``. + before using ``retry``. Depending on the retry strategy, a request may + be repeated several times using the same timeout each time. - project (Optional[str]): + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. + + project: Project ID of the project of where to run the upload. Defaults to the client's project. Returns: - requests.Response: - The "200 OK" response object returned after the final chunk - is uploaded. + The "200 OK" response object returned after the final chunk + is uploaded. """ upload, transport = self._initiate_resumable_upload( stream, metadata, num_retries, timeout, project=project ) while not upload.finished: - response = upload.transmit_next_chunk(transport) + response = upload.transmit_next_chunk(transport, timeout=timeout) return response def _initiate_resumable_upload( - self, stream, metadata, num_retries, timeout, project=None + self, + stream: IO[bytes], + metadata: Mapping[str, str], + num_retries: int, + timeout: Optional[ResumableTimeoutType], + project: Optional[str] = None, ): """Initiate a resumable upload. Args: - stream (IO[bytes]): A bytes IO object open for reading. + stream: A bytes IO object open for reading. - metadata (Dict): The metadata associated with the upload. + metadata: The metadata associated with the upload. - num_retries (int): + num_retries: Number of upload retries. (Deprecated: This argument will be removed in a future release.) - timeout (float): + timeout: The number of seconds to wait for the underlying HTTP transport - before using ``retry``. + before using ``retry``. Depending on the retry strategy, a request may + be repeated several times using the same timeout each time. - project (Optional[str]): + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. + + project: Project ID of the project of where to run the upload. Defaults to the client's project. @@ -2921,29 +2947,39 @@ def _initiate_resumable_upload( return upload, transport def _do_multipart_upload( - self, stream, metadata, size, num_retries, timeout, project=None + self, + stream: IO[bytes], + metadata: Mapping[str, str], + size: int, + num_retries: int, + timeout: Optional[ResumableTimeoutType], + project: Optional[str] = None, ): """Perform a multipart upload. Args: - stream (IO[bytes]): A bytes IO object open for reading. + stream: A bytes IO object open for reading. - metadata (Dict): The metadata associated with the upload. + metadata: The metadata associated with the upload. - size (int): + size: The number of bytes to be uploaded (which will be read from ``stream``). If not provided, the upload will be concluded once ``stream`` is exhausted (or :data:`None`). - num_retries (int): + num_retries: Number of upload retries. (Deprecated: This argument will be removed in a future release.) - timeout (float): + timeout: The number of seconds to wait for the underlying HTTP transport - before using ``retry``. + before using ``retry``. Depending on the retry strategy, a request may + be repeated several times using the same timeout each time. - project (Optional[str]): + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. + + project: Project ID of the project of where to run the upload. Defaults to the client's project. diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 97aa2eedb..9c93765e8 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -8235,6 +8235,22 @@ def test__do_resumable_upload_custom_project(self): assert initiation_url is not None assert "projects/custom-project" in initiation_url + def test__do_resumable_upload_custom_timeout(self): + file_obj = self._make_file_obj() + file_obj_len = len(file_obj.getvalue()) + transport = self._make_transport( + self._make_resumable_upload_responses(file_obj_len) + ) + client = self._make_client(transport) + + client._do_resumable_upload( + file_obj, self.EXPECTED_CONFIGURATION, num_retries=0, timeout=3.14 + ) + + # The timeout should be applied to all underlying calls. + for call_args in transport.request.call_args_list: + assert call_args.kwargs.get("timeout") == 3.14 + def test__do_multipart_upload(self): transport = self._make_transport([self._make_response(http.client.OK)]) client = self._make_client(transport) @@ -8442,7 +8458,7 @@ def test_upload_chunksize(client): upload.finished = False - def transmit_next_chunk(transport): + def transmit_next_chunk(transport, *args, **kwargs): upload.finished = True result = mock.MagicMock() result.json.return_value = {} From a1359560d6a54b5c261eadc1559db59b2fa58f1a Mon Sep 17 00:00:00 2001 From: "release-please[bot]" <55107282+release-please[bot]@users.noreply.github.com> Date: Thu, 2 Dec 2021 14:14:11 -0600 Subject: [PATCH 10/10] chore: release 2.31.0 (#1066) Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> --- CHANGELOG.md | 18 ++++++++++++++++++ google/cloud/bigquery/version.py | 2 +- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e10ad826..5ba219d20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,24 @@ [1]: https://pypi.org/project/google-cloud-bigquery/#history +## [2.31.0](https://www.github.com/googleapis/python-bigquery/compare/v2.30.1...v2.31.0) (2021-11-24) + + +### Features + +* allow cell magic body to be a $variable ([#1053](https://www.github.com/googleapis/python-bigquery/issues/1053)) ([3a681e0](https://www.github.com/googleapis/python-bigquery/commit/3a681e046819df18118aa0b2b5733416d004c9b3)) +* promote `RowIterator.to_arrow_iterable` to public method ([#1073](https://www.github.com/googleapis/python-bigquery/issues/1073)) ([21cd710](https://www.github.com/googleapis/python-bigquery/commit/21cd71022d60c32104f8f90ee2ca445fbb43f7f3)) + + +### Bug Fixes + +* apply timeout to all resumable upload requests ([#1070](https://www.github.com/googleapis/python-bigquery/issues/1070)) ([3314dfb](https://www.github.com/googleapis/python-bigquery/commit/3314dfbed62488503dc41b11e403a672fcf71048)) + + +### Dependencies + +* support OpenTelemetry >= 1.1.0 ([#1050](https://www.github.com/googleapis/python-bigquery/issues/1050)) ([4616cd5](https://www.github.com/googleapis/python-bigquery/commit/4616cd58d3c6da641fb881ce99a87dcdedc20ba2)) + ### [2.30.1](https://www.github.com/googleapis/python-bigquery/compare/v2.30.0...v2.30.1) (2021-11-04) diff --git a/google/cloud/bigquery/version.py b/google/cloud/bigquery/version.py index 877ea53d8..6329658af 100644 --- a/google/cloud/bigquery/version.py +++ b/google/cloud/bigquery/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "2.30.1" +__version__ = "2.31.0"