diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e10ad826..5ba219d20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,24 @@ [1]: https://pypi.org/project/google-cloud-bigquery/#history +## [2.31.0](https://www.github.com/googleapis/python-bigquery/compare/v2.30.1...v2.31.0) (2021-11-24) + + +### Features + +* allow cell magic body to be a $variable ([#1053](https://www.github.com/googleapis/python-bigquery/issues/1053)) ([3a681e0](https://www.github.com/googleapis/python-bigquery/commit/3a681e046819df18118aa0b2b5733416d004c9b3)) +* promote `RowIterator.to_arrow_iterable` to public method ([#1073](https://www.github.com/googleapis/python-bigquery/issues/1073)) ([21cd710](https://www.github.com/googleapis/python-bigquery/commit/21cd71022d60c32104f8f90ee2ca445fbb43f7f3)) + + +### Bug Fixes + +* apply timeout to all resumable upload requests ([#1070](https://www.github.com/googleapis/python-bigquery/issues/1070)) ([3314dfb](https://www.github.com/googleapis/python-bigquery/commit/3314dfbed62488503dc41b11e403a672fcf71048)) + + +### Dependencies + +* support OpenTelemetry >= 1.1.0 ([#1050](https://www.github.com/googleapis/python-bigquery/issues/1050)) ([4616cd5](https://www.github.com/googleapis/python-bigquery/commit/4616cd58d3c6da641fb881ce99a87dcdedc20ba2)) + ### [2.30.1](https://www.github.com/googleapis/python-bigquery/compare/v2.30.0...v2.30.1) (2021-11-04) diff --git a/google/cloud/__init__.py b/google/cloud/__init__.py index 8fcc60e2b..8e60d8439 100644 --- a/google/cloud/__init__.py +++ b/google/cloud/__init__.py @@ -21,4 +21,4 @@ except ImportError: import pkgutil - __path__ = pkgutil.extend_path(__path__, __name__) + __path__ = pkgutil.extend_path(__path__, __name__) # type: ignore diff --git a/google/cloud/bigquery/_helpers.py b/google/cloud/bigquery/_helpers.py index e95d38545..e2ca7fa07 100644 --- a/google/cloud/bigquery/_helpers.py +++ b/google/cloud/bigquery/_helpers.py @@ -22,7 +22,7 @@ from typing import Any, Optional, Union from dateutil import relativedelta -from google.cloud._helpers import UTC +from google.cloud._helpers import UTC # type: ignore from google.cloud._helpers import _date_from_iso8601_date from google.cloud._helpers import _datetime_from_microseconds from google.cloud._helpers import _RFC3339_MICROS @@ -126,7 +126,7 @@ def __init__(self): def installed_version(self) -> packaging.version.Version: """Return the parsed version of pyarrow.""" if self._installed_version is None: - import pyarrow + import pyarrow # type: ignore self._installed_version = packaging.version.parse( # Use 0.0.0, since it is earlier than any released version. diff --git a/google/cloud/bigquery/_http.py b/google/cloud/bigquery/_http.py index 81e7922e6..f7207f32e 100644 --- a/google/cloud/bigquery/_http.py +++ b/google/cloud/bigquery/_http.py @@ -17,7 +17,7 @@ import os import pkg_resources -from google.cloud import _http # pytype: disable=import-error +from google.cloud import _http # type: ignore # pytype: disable=import-error from google.cloud.bigquery import __version__ diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 0cb851469..263a1a9cf 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -21,7 +21,7 @@ import warnings try: - import pandas + import pandas # type: ignore except ImportError: # pragma: NO COVER pandas = None else: @@ -29,7 +29,7 @@ try: # _BaseGeometry is used to detect shapely objevys in `bq_to_arrow_array` - from shapely.geometry.base import BaseGeometry as _BaseGeometry + from shapely.geometry.base import BaseGeometry as _BaseGeometry # type: ignore except ImportError: # pragma: NO COVER # No shapely, use NoneType for _BaseGeometry as a placeholder. _BaseGeometry = type(None) @@ -43,7 +43,7 @@ def _to_wkb(): # - Avoid extra work done by `shapely.wkb.dumps` that we don't need. # - Caches the WKBWriter (and write method lookup :) ) # - Avoids adding WKBWriter, lgeos, and notnull to the module namespace. - from shapely.geos import WKBWriter, lgeos + from shapely.geos import WKBWriter, lgeos # type: ignore write = WKBWriter(lgeos).write notnull = pandas.notnull @@ -574,7 +574,7 @@ def dataframe_to_parquet( """ pyarrow = _helpers.PYARROW_VERSIONS.try_import(raise_if_error=True) - import pyarrow.parquet + import pyarrow.parquet # type: ignore kwargs = ( {"use_compliant_nested_type": parquet_use_compliant_nested_type} @@ -838,7 +838,12 @@ def _download_table_bqstorage( def download_arrow_bqstorage( - project_id, table, bqstorage_client, preserve_order=False, selected_fields=None, + project_id, + table, + bqstorage_client, + preserve_order=False, + selected_fields=None, + max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, ): return _download_table_bqstorage( project_id, @@ -847,6 +852,7 @@ def download_arrow_bqstorage( preserve_order=preserve_order, selected_fields=selected_fields, page_to_item=_bqstorage_page_to_arrow, + max_queue_size=max_queue_size, ) diff --git a/google/cloud/bigquery/_tqdm_helpers.py b/google/cloud/bigquery/_tqdm_helpers.py index 99e720e2b..632f70f87 100644 --- a/google/cloud/bigquery/_tqdm_helpers.py +++ b/google/cloud/bigquery/_tqdm_helpers.py @@ -21,7 +21,7 @@ import warnings try: - import tqdm + import tqdm # type: ignore except ImportError: # pragma: NO COVER tqdm = None diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 4bdd43e8f..a5f3d5419 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -28,12 +28,24 @@ import math import os import tempfile -from typing import Any, BinaryIO, Dict, Iterable, Optional, Sequence, Tuple, Union +import typing +from typing import ( + Any, + Dict, + IO, + Iterable, + Mapping, + List, + Optional, + Sequence, + Tuple, + Union, +) import uuid import warnings from google import resumable_media # type: ignore -from google.resumable_media.requests import MultipartUpload +from google.resumable_media.requests import MultipartUpload # type: ignore from google.resumable_media.requests import ResumableUpload import google.api_core.client_options @@ -41,16 +53,16 @@ from google.api_core.iam import Policy from google.api_core import page_iterator from google.api_core import retry as retries -import google.cloud._helpers +import google.cloud._helpers # type: ignore from google.cloud import exceptions # pytype: disable=import-error -from google.cloud.client import ClientWithProject # pytype: disable=import-error +from google.cloud.client import ClientWithProject # type: ignore # pytype: disable=import-error try: from google.cloud.bigquery_storage_v1.services.big_query_read.client import ( DEFAULT_CLIENT_INFO as DEFAULT_BQSTORAGE_CLIENT_INFO, ) except ImportError: - DEFAULT_BQSTORAGE_CLIENT_INFO = None + DEFAULT_BQSTORAGE_CLIENT_INFO = None # type: ignore from google.cloud.bigquery._helpers import _del_sub_prop from google.cloud.bigquery._helpers import _get_sub_prop @@ -100,6 +112,16 @@ pyarrow = _helpers.PYARROW_VERSIONS.try_import() +TimeoutType = Union[float, None] +ResumableTimeoutType = Union[ + None, float, Tuple[float, float] +] # for resumable media methods + +if typing.TYPE_CHECKING: # pragma: NO COVER + # os.PathLike is only subscriptable in Python 3.9+, thus shielding with a condition. + PathType = Union[str, bytes, os.PathLike[str], os.PathLike[bytes]] + import pandas # type: ignore + import requests # required by api-core _DEFAULT_CHUNKSIZE = 100 * 1024 * 1024 # 100 MB _MAX_MULTIPART_SIZE = 5 * 1024 * 1024 @@ -248,7 +270,7 @@ def get_service_account_email( self, project: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> str: """Get the email address of the project's BigQuery service account @@ -295,7 +317,7 @@ def list_projects( max_results: int = None, page_token: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, page_size: int = None, ) -> page_iterator.Iterator: """List projects for the project associated with this client. @@ -361,7 +383,7 @@ def list_datasets( max_results: int = None, page_token: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, page_size: int = None, ) -> page_iterator.Iterator: """List datasets for the project associated with this client. @@ -400,7 +422,7 @@ def list_datasets( Iterator of :class:`~google.cloud.bigquery.dataset.DatasetListItem`. associated with the project. """ - extra_params = {} + extra_params: Dict[str, Any] = {} if project is None: project = self.project if include_all: @@ -526,12 +548,12 @@ def _ensure_bqstorage_client( bqstorage_client = bigquery_storage.BigQueryReadClient( credentials=self._credentials, client_options=client_options, - client_info=client_info, + client_info=client_info, # type: ignore # (None is also accepted) ) return bqstorage_client - def _dataset_from_arg(self, dataset): + def _dataset_from_arg(self, dataset) -> Union[Dataset, DatasetReference]: if isinstance(dataset, str): dataset = DatasetReference.from_string( dataset, default_project=self.project @@ -552,7 +574,7 @@ def create_dataset( dataset: Union[str, Dataset, DatasetReference, DatasetListItem], exists_ok: bool = False, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Dataset: """API call: create the dataset via a POST request. @@ -627,7 +649,7 @@ def create_routine( routine: Routine, exists_ok: bool = False, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Routine: """[Beta] Create a routine via a POST request. @@ -682,7 +704,7 @@ def create_table( table: Union[str, Table, TableReference, TableListItem], exists_ok: bool = False, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Table: """API call: create a table via a PUT request @@ -765,7 +787,7 @@ def get_dataset( self, dataset_ref: Union[DatasetReference, str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Dataset: """Fetch the dataset referenced by ``dataset_ref`` @@ -809,7 +831,7 @@ def get_iam_policy( table: Union[Table, TableReference, TableListItem, str], requested_policy_version: int = 1, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Policy: table = _table_arg_to_table_ref(table, default_project=self.project) @@ -838,7 +860,7 @@ def set_iam_policy( policy: Policy, updateMask: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Policy: table = _table_arg_to_table_ref(table, default_project=self.project) @@ -870,7 +892,7 @@ def test_iam_permissions( table: Union[Table, TableReference, TableListItem, str], permissions: Sequence[str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Dict[str, Any]: table = _table_arg_to_table_ref(table, default_project=self.project) @@ -894,7 +916,7 @@ def get_model( self, model_ref: Union[ModelReference, str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Model: """[Beta] Fetch the model referenced by ``model_ref``. @@ -937,7 +959,7 @@ def get_routine( self, routine_ref: Union[Routine, RoutineReference, str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Routine: """[Beta] Get the routine referenced by ``routine_ref``. @@ -981,7 +1003,7 @@ def get_table( self, table: Union[Table, TableReference, TableListItem, str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Table: """Fetch the table referenced by ``table``. @@ -1024,7 +1046,7 @@ def update_dataset( dataset: Dataset, fields: Sequence[str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Dataset: """Change some fields of a dataset. @@ -1071,7 +1093,7 @@ def update_dataset( """ partial = dataset._build_resource(fields) if dataset.etag is not None: - headers = {"If-Match": dataset.etag} + headers: Optional[Dict[str, str]] = {"If-Match": dataset.etag} else: headers = None path = dataset.path @@ -1094,7 +1116,7 @@ def update_model( model: Model, fields: Sequence[str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Model: """[Beta] Change some fields of a model. @@ -1135,7 +1157,7 @@ def update_model( """ partial = model._build_resource(fields) if model.etag: - headers = {"If-Match": model.etag} + headers: Optional[Dict[str, str]] = {"If-Match": model.etag} else: headers = None path = model.path @@ -1158,7 +1180,7 @@ def update_routine( routine: Routine, fields: Sequence[str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Routine: """[Beta] Change some fields of a routine. @@ -1205,7 +1227,7 @@ def update_routine( """ partial = routine._build_resource(fields) if routine.etag: - headers = {"If-Match": routine.etag} + headers: Optional[Dict[str, str]] = {"If-Match": routine.etag} else: headers = None @@ -1232,7 +1254,7 @@ def update_table( table: Table, fields: Sequence[str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Table: """Change some fields of a table. @@ -1273,7 +1295,7 @@ def update_table( """ partial = table._build_resource(fields) if table.etag is not None: - headers = {"If-Match": table.etag} + headers: Optional[Dict[str, str]] = {"If-Match": table.etag} else: headers = None @@ -1298,7 +1320,7 @@ def list_models( max_results: int = None, page_token: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, page_size: int = None, ) -> page_iterator.Iterator: """[Beta] List models in the dataset. @@ -1366,7 +1388,7 @@ def api_request(*args, **kwargs): max_results=max_results, page_size=page_size, ) - result.dataset = dataset + result.dataset = dataset # type: ignore return result def list_routines( @@ -1375,7 +1397,7 @@ def list_routines( max_results: int = None, page_token: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, page_size: int = None, ) -> page_iterator.Iterator: """[Beta] List routines in the dataset. @@ -1443,7 +1465,7 @@ def api_request(*args, **kwargs): max_results=max_results, page_size=page_size, ) - result.dataset = dataset + result.dataset = dataset # type: ignore return result def list_tables( @@ -1452,7 +1474,7 @@ def list_tables( max_results: int = None, page_token: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, page_size: int = None, ) -> page_iterator.Iterator: """List tables in the dataset. @@ -1519,7 +1541,7 @@ def api_request(*args, **kwargs): max_results=max_results, page_size=page_size, ) - result.dataset = dataset + result.dataset = dataset # type: ignore return result def delete_dataset( @@ -1527,7 +1549,7 @@ def delete_dataset( dataset: Union[Dataset, DatasetReference, DatasetListItem, str], delete_contents: bool = False, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, not_found_ok: bool = False, ) -> None: """Delete a dataset. @@ -1586,7 +1608,7 @@ def delete_model( self, model: Union[Model, ModelReference, str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, not_found_ok: bool = False, ) -> None: """[Beta] Delete a model @@ -1640,7 +1662,7 @@ def delete_job_metadata( project: Optional[str] = None, location: Optional[str] = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, not_found_ok: bool = False, ): """[Beta] Delete job metadata from job history. @@ -1703,7 +1725,7 @@ def delete_routine( self, routine: Union[Routine, RoutineReference, str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, not_found_ok: bool = False, ) -> None: """[Beta] Delete a routine. @@ -1757,7 +1779,7 @@ def delete_table( self, table: Union[Table, TableReference, TableListItem, str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, not_found_ok: bool = False, ) -> None: """Delete a table @@ -1811,7 +1833,7 @@ def _get_query_results( project: str = None, timeout_ms: int = None, location: str = None, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> _QueryResults: """Get the query results object for a query job. @@ -1836,7 +1858,7 @@ def _get_query_results( A new ``_QueryResults`` instance. """ - extra_params = {"maxResults": 0} + extra_params: Dict[str, Any] = {"maxResults": 0} if timeout is not None: timeout = max(timeout, _MIN_GET_QUERY_RESULTS_TIMEOUT) @@ -1870,20 +1892,18 @@ def _get_query_results( ) return _QueryResults.from_api_repr(resource) - def job_from_resource(self, resource: dict) -> job.UnknownJob: + def job_from_resource( + self, resource: dict + ) -> Union[ + job.CopyJob, job.ExtractJob, job.LoadJob, job.QueryJob, job.UnknownJob, + ]: """Detect correct job type from resource and instantiate. Args: resource (Dict): one job resource from API response Returns: - Union[ \ - google.cloud.bigquery.job.LoadJob, \ - google.cloud.bigquery.job.CopyJob, \ - google.cloud.bigquery.job.ExtractJob, \ - google.cloud.bigquery.job.QueryJob \ - ]: - The job instance, constructed via the resource. + The job instance, constructed via the resource. """ config = resource.get("configuration", {}) if "load" in config: @@ -1900,7 +1920,7 @@ def create_job( self, job_config: dict, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob]: """Create a new job. Args: @@ -1933,7 +1953,7 @@ def create_job( return self.load_table_from_uri( source_uris, destination, - job_config=load_job_config, + job_config=typing.cast(LoadJobConfig, load_job_config), retry=retry, timeout=timeout, ) @@ -1953,7 +1973,7 @@ def create_job( return self.copy_table( sources, destination, - job_config=copy_job_config, + job_config=typing.cast(CopyJobConfig, copy_job_config), retry=retry, timeout=timeout, ) @@ -1973,7 +1993,7 @@ def create_job( return self.extract_table( source, destination_uris, - job_config=extract_job_config, + job_config=typing.cast(ExtractJobConfig, extract_job_config), retry=retry, timeout=timeout, source_type=source_type, @@ -1986,32 +2006,30 @@ def create_job( ) query = _get_sub_prop(copy_config, ["query", "query"]) return self.query( - query, job_config=query_job_config, retry=retry, timeout=timeout + query, + job_config=typing.cast(QueryJobConfig, query_job_config), + retry=retry, + timeout=timeout, ) else: raise TypeError("Invalid job configuration received.") def get_job( self, - job_id: str, + job_id: Union[str, job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob], project: str = None, location: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, - ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob]: + timeout: TimeoutType = DEFAULT_TIMEOUT, + ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob, job.UnknownJob]: """Fetch a job for the project associated with this client. See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get Args: - job_id (Union[ \ - str, \ - google.cloud.bigquery.job.LoadJob, \ - google.cloud.bigquery.job.CopyJob, \ - google.cloud.bigquery.job.ExtractJob, \ - google.cloud.bigquery.job.QueryJob \ - ]): Job identifier. + job_id: + Job identifier. Keyword Arguments: project (Optional[str]): @@ -2026,13 +2044,7 @@ def get_job( before using ``retry``. Returns: - Union[ \ - google.cloud.bigquery.job.LoadJob, \ - google.cloud.bigquery.job.CopyJob, \ - google.cloud.bigquery.job.ExtractJob, \ - google.cloud.bigquery.job.QueryJob \ - ]: - Job instance, based on the resource returned by the API. + Job instance, based on the resource returned by the API. """ extra_params = {"projection": "full"} @@ -2071,7 +2083,7 @@ def cancel_job( project: str = None, location: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob]: """Attempt to cancel a job from a job ID. @@ -2137,7 +2149,11 @@ def cancel_job( timeout=timeout, ) - return self.job_from_resource(resource["job"]) + job_instance = self.job_from_resource(resource["job"]) # never an UnknownJob + + return typing.cast( + Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob], job_instance, + ) def list_jobs( self, @@ -2148,7 +2164,7 @@ def list_jobs( all_users: bool = None, state_filter: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, min_creation_time: datetime.datetime = None, max_creation_time: datetime.datetime = None, page_size: int = None, @@ -2263,9 +2279,9 @@ def load_table_from_uri( project: str = None, job_config: LoadJobConfig = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> job.LoadJob: - """Starts a job for loading data into a table from CloudStorage. + """Starts a job for loading data into a table from Cloud Storage. See https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload @@ -2338,7 +2354,7 @@ def load_table_from_uri( def load_table_from_file( self, - file_obj: BinaryIO, + file_obj: IO[bytes], destination: Union[Table, TableReference, TableListItem, str], rewind: bool = False, size: int = None, @@ -2348,7 +2364,7 @@ def load_table_from_file( location: str = None, project: str = None, job_config: LoadJobConfig = None, - timeout: float = DEFAULT_TIMEOUT, + timeout: ResumableTimeoutType = DEFAULT_TIMEOUT, ) -> job.LoadJob: """Upload the contents of this table from a file-like object. @@ -2356,42 +2372,42 @@ def load_table_from_file( returns a :class:`~google.cloud.bigquery.job.LoadJob`. Args: - file_obj (file): A file handle opened in binary mode for reading. - destination (Union[ \ - google.cloud.bigquery.table.Table, \ - google.cloud.bigquery.table.TableReference, \ - google.cloud.bigquery.table.TableListItem, \ - str, \ - ]): + file_obj: + A file handle opened in binary mode for reading. + destination: Table into which data is to be loaded. If a string is passed in, this method attempts to create a table reference from a string using :func:`google.cloud.bigquery.table.TableReference.from_string`. Keyword Arguments: - rewind (Optional[bool]): + rewind: If True, seek to the beginning of the file handle before reading the file. - size (Optional[int]): + size: The number of bytes to read from the file handle. If size is ``None`` or large, resumable upload will be used. Otherwise, multipart upload will be used. - num_retries (Optional[int]): Number of upload retries. Defaults to 6. - job_id (Optional[str]): Name of the job. - job_id_prefix (Optional[str]): + num_retries: Number of upload retries. Defaults to 6. + job_id: Name of the job. + job_id_prefix: The user-provided prefix for a randomly generated job ID. This parameter will be ignored if a ``job_id`` is also given. - location (Optional[str]): + location: Location where to run the job. Must match the location of the destination table. - project (Optional[str]): + project: Project ID of the project of where to run the job. Defaults to the client's project. - job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]): + job_config: Extra configuration options for the job. - timeout (Optional[float]): + timeout: The number of seconds to wait for the underlying HTTP transport - before using ``retry``. + before using ``retry``. Depending on the retry strategy, a request + may be repeated several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -2439,11 +2455,11 @@ def load_table_from_file( except resumable_media.InvalidResponse as exc: raise exceptions.from_http_response(exc.response) - return self.job_from_resource(response.json()) + return typing.cast(LoadJob, self.job_from_resource(response.json())) def load_table_from_dataframe( self, - dataframe, + dataframe: "pandas.DataFrame", destination: Union[Table, TableReference, str], num_retries: int = _DEFAULT_NUM_RETRIES, job_id: str = None, @@ -2452,7 +2468,7 @@ def load_table_from_dataframe( project: str = None, job_config: LoadJobConfig = None, parquet_compression: str = "snappy", - timeout: float = DEFAULT_TIMEOUT, + timeout: ResumableTimeoutType = DEFAULT_TIMEOUT, ) -> job.LoadJob: """Upload the contents of a table from a pandas DataFrame. @@ -2471,9 +2487,9 @@ def load_table_from_dataframe( https://github.com/googleapis/python-bigquery/issues/19 Args: - dataframe (pandas.DataFrame): + dataframe: A :class:`~pandas.DataFrame` containing the data to load. - destination (google.cloud.bigquery.table.TableReference): + destination: The destination table to use for loading the data. If it is an existing table, the schema of the :class:`~pandas.DataFrame` must match the schema of the destination table. If the table @@ -2485,19 +2501,19 @@ def load_table_from_dataframe( :func:`google.cloud.bigquery.table.TableReference.from_string`. Keyword Arguments: - num_retries (Optional[int]): Number of upload retries. - job_id (Optional[str]): Name of the job. - job_id_prefix (Optional[str]): + num_retries: Number of upload retries. + job_id: Name of the job. + job_id_prefix: The user-provided prefix for a randomly generated job ID. This parameter will be ignored if a ``job_id`` is also given. - location (Optional[str]): + location: Location where to run the job. Must match the location of the destination table. - project (Optional[str]): + project: Project ID of the project of where to run the job. Defaults to the client's project. - job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]): + job_config: Extra configuration options for the job. To override the default pandas data type conversions, supply @@ -2514,7 +2530,7 @@ def load_table_from_dataframe( :attr:`~google.cloud.bigquery.job.SourceFormat.CSV` and :attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are supported. - parquet_compression (Optional[str]): + parquet_compression: [Beta] The compression method to use if intermittently serializing ``dataframe`` to a parquet file. @@ -2527,9 +2543,13 @@ def load_table_from_dataframe( passed as the ``compression`` argument to the underlying ``DataFrame.to_parquet()`` method. https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet - timeout (Optional[float]): + timeout: The number of seconds to wait for the underlying HTTP transport - before using ``retry``. + before using ``retry``. Depending on the retry strategy, a request may + be repeated several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -2592,7 +2612,7 @@ def load_table_from_dataframe( try: table = self.get_table(destination) except core_exceptions.NotFound: - table = None + pass else: columns_and_indexes = frozenset( name @@ -2707,7 +2727,7 @@ def load_table_from_json( location: str = None, project: str = None, job_config: LoadJobConfig = None, - timeout: float = DEFAULT_TIMEOUT, + timeout: ResumableTimeoutType = DEFAULT_TIMEOUT, ) -> job.LoadJob: """Upload the contents of a table from a JSON string or dict. @@ -2731,36 +2751,35 @@ def load_table_from_json( client = bigquery.Client() client.load_table_from_file(data_as_file, ...) - destination (Union[ \ - google.cloud.bigquery.table.Table, \ - google.cloud.bigquery.table.TableReference, \ - google.cloud.bigquery.table.TableListItem, \ - str, \ - ]): + destination: Table into which data is to be loaded. If a string is passed in, this method attempts to create a table reference from a string using :func:`google.cloud.bigquery.table.TableReference.from_string`. Keyword Arguments: - num_retries (Optional[int]): Number of upload retries. - job_id (Optional[str]): Name of the job. - job_id_prefix (Optional[str]): + num_retries: Number of upload retries. + job_id: Name of the job. + job_id_prefix: The user-provided prefix for a randomly generated job ID. This parameter will be ignored if a ``job_id`` is also given. - location (Optional[str]): + location: Location where to run the job. Must match the location of the destination table. - project (Optional[str]): + project: Project ID of the project of where to run the job. Defaults to the client's project. - job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]): + job_config: Extra configuration options for the job. The ``source_format`` setting is always set to :attr:`~google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON`. - timeout (Optional[float]): + timeout: The number of seconds to wait for the underlying HTTP transport - before using ``retry``. + before using ``retry``. Depending on the retry strategy, a request may + be repeated several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -2809,60 +2828,77 @@ def load_table_from_json( ) def _do_resumable_upload( - self, stream, metadata, num_retries, timeout, project=None - ): + self, + stream: IO[bytes], + metadata: Mapping[str, str], + num_retries: int, + timeout: Optional[ResumableTimeoutType], + project: Optional[str] = None, + ) -> "requests.Response": """Perform a resumable upload. Args: - stream (IO[bytes]): A bytes IO object open for reading. + stream: A bytes IO object open for reading. - metadata (Dict): The metadata associated with the upload. + metadata: The metadata associated with the upload. - num_retries (int): + num_retries: Number of upload retries. (Deprecated: This argument will be removed in a future release.) - timeout (float): + timeout: The number of seconds to wait for the underlying HTTP transport - before using ``retry``. + before using ``retry``. Depending on the retry strategy, a request may + be repeated several times using the same timeout each time. - project (Optional[str]): + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. + + project: Project ID of the project of where to run the upload. Defaults to the client's project. Returns: - requests.Response: - The "200 OK" response object returned after the final chunk - is uploaded. + The "200 OK" response object returned after the final chunk + is uploaded. """ upload, transport = self._initiate_resumable_upload( stream, metadata, num_retries, timeout, project=project ) while not upload.finished: - response = upload.transmit_next_chunk(transport) + response = upload.transmit_next_chunk(transport, timeout=timeout) return response def _initiate_resumable_upload( - self, stream, metadata, num_retries, timeout, project=None + self, + stream: IO[bytes], + metadata: Mapping[str, str], + num_retries: int, + timeout: Optional[ResumableTimeoutType], + project: Optional[str] = None, ): """Initiate a resumable upload. Args: - stream (IO[bytes]): A bytes IO object open for reading. + stream: A bytes IO object open for reading. - metadata (Dict): The metadata associated with the upload. + metadata: The metadata associated with the upload. - num_retries (int): + num_retries: Number of upload retries. (Deprecated: This argument will be removed in a future release.) - timeout (float): + timeout: The number of seconds to wait for the underlying HTTP transport - before using ``retry``. + before using ``retry``. Depending on the retry strategy, a request may + be repeated several times using the same timeout each time. - project (Optional[str]): + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. + + project: Project ID of the project of where to run the upload. Defaults to the client's project. @@ -2911,29 +2947,39 @@ def _initiate_resumable_upload( return upload, transport def _do_multipart_upload( - self, stream, metadata, size, num_retries, timeout, project=None + self, + stream: IO[bytes], + metadata: Mapping[str, str], + size: int, + num_retries: int, + timeout: Optional[ResumableTimeoutType], + project: Optional[str] = None, ): """Perform a multipart upload. Args: - stream (IO[bytes]): A bytes IO object open for reading. + stream: A bytes IO object open for reading. - metadata (Dict): The metadata associated with the upload. + metadata: The metadata associated with the upload. - size (int): + size: The number of bytes to be uploaded (which will be read from ``stream``). If not provided, the upload will be concluded once ``stream`` is exhausted (or :data:`None`). - num_retries (int): + num_retries: Number of upload retries. (Deprecated: This argument will be removed in a future release.) - timeout (float): + timeout: The number of seconds to wait for the underlying HTTP transport - before using ``retry``. + before using ``retry``. Depending on the retry strategy, a request may + be repeated several times using the same timeout each time. - project (Optional[str]): + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. + + project: Project ID of the project of where to run the upload. Defaults to the client's project. @@ -2995,7 +3041,7 @@ def copy_table( project: str = None, job_config: CopyJobConfig = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> job.CopyJob: """Copy one or more tables to another table. @@ -3101,7 +3147,7 @@ def extract_table( project: str = None, job_config: ExtractJobConfig = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, source_type: str = "Table", ) -> job.ExtractJob: """Start a job to extract a table into Cloud Storage files. @@ -3200,7 +3246,7 @@ def query( location: str = None, project: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, job_retry: retries.Retry = DEFAULT_JOB_RETRY, ) -> job.QueryJob: """Run a SQL query. @@ -3357,7 +3403,7 @@ def insert_rows( table: Union[Table, TableReference, str], rows: Union[Iterable[Tuple], Iterable[Dict]], selected_fields: Sequence[SchemaField] = None, - **kwargs: dict, + **kwargs, ) -> Sequence[dict]: """Insert rows into a table via the streaming API. @@ -3482,7 +3528,7 @@ def insert_rows_json( ignore_unknown_values: bool = None, template_suffix: str = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Sequence[dict]: """Insert rows into a table without applying local type conversions. @@ -3550,8 +3596,8 @@ def insert_rows_json( # insert_rows_json doesn't need the table schema. It's not doing any # type conversions. table = _table_arg_to_table_ref(table, default_project=self.project) - rows_info = [] - data = {"rows": rows_info} + rows_info: List[Any] = [] + data: Dict[str, Any] = {"rows": rows_info} if row_ids is None: warnings.warn( @@ -3569,7 +3615,7 @@ def insert_rows_json( raise TypeError(msg) for i, row in enumerate(json_rows): - info = {"json": row} + info: Dict[str, Any] = {"json": row} if row_ids is AutoRowIDs.GENERATE_UUID: info["insertId"] = str(uuid.uuid4()) @@ -3618,7 +3664,7 @@ def list_partitions( self, table: Union[Table, TableReference, TableListItem, str], retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> Sequence[str]: """List the partitions in a table. @@ -3669,7 +3715,7 @@ def list_rows( start_index: int = None, page_size: int = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> RowIterator: """List the rows of the table. @@ -3745,7 +3791,7 @@ def list_rows( table = self.get_table(table.reference, retry=retry, timeout=timeout) schema = table.schema - params = {} + params: Dict[str, Any] = {} if selected_fields is not None: params["selectedFields"] = ",".join(field.name for field in selected_fields) if start_index is not None: @@ -3781,7 +3827,7 @@ def _list_rows_from_query_results( start_index: int = None, page_size: int = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: float = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_TIMEOUT, ) -> RowIterator: """List the rows of a completed query. See @@ -3826,7 +3872,7 @@ def _list_rows_from_query_results( Iterator of row data :class:`~google.cloud.bigquery.table.Row`-s. """ - params = { + params: Dict[str, Any] = { "fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS, "location": location, } @@ -3867,7 +3913,7 @@ def _schema_to_json_file_object(self, schema_list, file_obj): """ json.dump(schema_list, file_obj, indent=2, sort_keys=True) - def schema_from_json(self, file_or_path: Union[str, BinaryIO]): + def schema_from_json(self, file_or_path: "PathType"): """Takes a file object or file path that contains json that describes a table schema. @@ -3881,7 +3927,7 @@ def schema_from_json(self, file_or_path: Union[str, BinaryIO]): return self._schema_from_json_file_object(file_obj) def schema_to_json( - self, schema_list: Sequence[SchemaField], destination: Union[str, BinaryIO] + self, schema_list: Sequence[SchemaField], destination: "PathType" ): """Takes a list of schema field objects. @@ -4023,13 +4069,12 @@ def _extract_job_reference(job, project=None, location=None): return (project, location, job_id) -def _make_job_id(job_id, prefix=None): +def _make_job_id(job_id: Optional[str], prefix: Optional[str] = None) -> str: """Construct an ID for a new job. Args: - job_id (Optional[str]): the user-provided job ID. - - prefix (Optional[str]): the user-provided prefix for a job ID. + job_id: the user-provided job ID. + prefix: the user-provided prefix for a job ID. Returns: str: A job ID diff --git a/google/cloud/bigquery/dataset.py b/google/cloud/bigquery/dataset.py index 21e56f305..ff015d605 100644 --- a/google/cloud/bigquery/dataset.py +++ b/google/cloud/bigquery/dataset.py @@ -18,7 +18,7 @@ import copy -import google.cloud._helpers +import google.cloud._helpers # type: ignore from google.cloud.bigquery import _helpers from google.cloud.bigquery.model import ModelReference diff --git a/google/cloud/bigquery/dbapi/_helpers.py b/google/cloud/bigquery/dbapi/_helpers.py index 72e711bcf..e5c7ef7ec 100644 --- a/google/cloud/bigquery/dbapi/_helpers.py +++ b/google/cloud/bigquery/dbapi/_helpers.py @@ -161,7 +161,7 @@ def _parse_struct_fields( yield m.group(1, 2) -SCALAR, ARRAY, STRUCT = "sar" +SCALAR, ARRAY, STRUCT = ("s", "a", "r") def _parse_type( @@ -226,19 +226,19 @@ def complex_query_parameter_type(name: typing.Optional[str], type_: str, base: s type_type, sub_type = _parse_type(type_, name, base) if type_type == SCALAR: - type_ = sub_type + result_type = sub_type elif type_type == ARRAY: - type_ = query.ArrayQueryParameterType(sub_type, name=name) + result_type = query.ArrayQueryParameterType(sub_type, name=name) elif type_type == STRUCT: fields = [ complex_query_parameter_type(field_name, field_type, base) for field_name, field_type in sub_type ] - type_ = query.StructQueryParameterType(*fields, name=name) + result_type = query.StructQueryParameterType(*fields, name=name) else: # pragma: NO COVER raise AssertionError("Bad type_type", type_type) # Can't happen :) - return type_ + return result_type def complex_query_parameter( @@ -256,6 +256,12 @@ def complex_query_parameter( struct>> """ + param: typing.Union[ + query.ScalarQueryParameter, + query.ArrayQueryParameter, + query.StructQueryParameter, + ] + base = base or type_ type_type, sub_type = _parse_type(type_, name, base) diff --git a/google/cloud/bigquery/dbapi/cursor.py b/google/cloud/bigquery/dbapi/cursor.py index b1239ff57..03f3b72ca 100644 --- a/google/cloud/bigquery/dbapi/cursor.py +++ b/google/cloud/bigquery/dbapi/cursor.py @@ -31,7 +31,7 @@ from google.cloud.bigquery import job from google.cloud.bigquery.dbapi import _helpers from google.cloud.bigquery.dbapi import exceptions -import google.cloud.exceptions +import google.cloud.exceptions # type: ignore _LOGGER = logging.getLogger(__name__) diff --git a/google/cloud/bigquery/external_config.py b/google/cloud/bigquery/external_config.py index 5f284c639..e6f6a97c3 100644 --- a/google/cloud/bigquery/external_config.py +++ b/google/cloud/bigquery/external_config.py @@ -556,6 +556,10 @@ def from_api_repr(cls, resource: dict) -> "GoogleSheetsOptions": ParquetOptions, ) +OptionsType = Union[ + AvroOptions, BigtableOptions, CSVOptions, GoogleSheetsOptions, ParquetOptions, +] + class HivePartitioningOptions(object): """[Beta] Options that configure hive partitioning. @@ -664,13 +668,15 @@ def source_format(self): return self._properties["sourceFormat"] @property - def options(self) -> Optional[Union[_OPTION_CLASSES]]: + def options(self) -> Optional[OptionsType]: """Source-specific options.""" for optcls in _OPTION_CLASSES: - if self.source_format == optcls._SOURCE_FORMAT: - options = optcls() - self._properties.setdefault(optcls._RESOURCE_NAME, {}) - options._properties = self._properties[optcls._RESOURCE_NAME] + # The code below is too much magic for mypy to handle. + if self.source_format == optcls._SOURCE_FORMAT: # type: ignore + options: OptionsType = optcls() # type: ignore + options._properties = self._properties.setdefault( + optcls._RESOURCE_NAME, {} # type: ignore + ) return options # No matching source format found. @@ -799,6 +805,13 @@ def schema(self): prop = self._properties.get("schema", {}) return [SchemaField.from_api_repr(field) for field in prop.get("fields", [])] + @schema.setter + def schema(self, value): + prop = value + if value is not None: + prop = {"fields": [field.to_api_repr() for field in value]} + self._properties["schema"] = prop + @property def connection_id(self): """Optional[str]: [Experimental] ID of a BigQuery Connection API @@ -816,13 +829,6 @@ def connection_id(self): def connection_id(self, value): self._properties["connectionId"] = value - @schema.setter - def schema(self, value): - prop = value - if value is not None: - prop = {"fields": [field.to_api_repr() for field in value]} - self._properties["schema"] = prop - @property def avro_options(self) -> Optional[AvroOptions]: """Additional properties to set if ``sourceFormat`` is set to AVRO. diff --git a/google/cloud/bigquery/job/base.py b/google/cloud/bigquery/job/base.py index 88d6bec14..97acab5d2 100644 --- a/google/cloud/bigquery/job/base.py +++ b/google/cloud/bigquery/job/base.py @@ -696,7 +696,7 @@ def done( self.reload(retry=retry, timeout=timeout) return self.state == _DONE_STATE - def result( + def result( # type: ignore # (signature complaint) self, retry: "retries.Retry" = DEFAULT_RETRY, timeout: float = None ) -> "_AsyncJob": """Start the job and wait for it to complete and get the result. @@ -921,7 +921,7 @@ def from_api_repr(cls, resource: dict) -> "_JobConfig": # cls is one of the job config subclasses that provides the job_type argument to # this base class on instantiation, thus missing-parameter warning is a false # positive here. - job_config = cls() # pytype: disable=missing-parameter + job_config = cls() # type: ignore # pytype: disable=missing-parameter job_config._properties = resource return job_config diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 942c85fc3..36e388238 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -56,9 +56,9 @@ if typing.TYPE_CHECKING: # pragma: NO COVER # Assumption: type checks are only used by library developers and CI environments # that have all optional dependencies installed, thus no conditional imports. - import pandas - import geopandas - import pyarrow + import pandas # type: ignore + import geopandas # type: ignore + import pyarrow # type: ignore from google.api_core import retry as retries from google.cloud import bigquery_storage from google.cloud.bigquery.client import Client @@ -144,7 +144,7 @@ def from_api_repr(cls, stats: Dict[str, str]) -> "DmlStats": args = ( int(stats.get(api_field, default_val)) - for api_field, default_val in zip(api_fields, cls.__new__.__defaults__) + for api_field, default_val in zip(api_fields, cls.__new__.__defaults__) # type: ignore ) return cls(*args) @@ -161,7 +161,7 @@ def __init__( statement_byte_budget: Optional[int] = None, key_result_statement: Optional[KeyResultStatementKind] = None, ): - self._properties = {} + self._properties: Dict[str, Any] = {} self.statement_timeout_ms = statement_timeout_ms self.statement_byte_budget = statement_byte_budget self.key_result_statement = key_result_statement @@ -193,9 +193,8 @@ def statement_timeout_ms(self) -> Union[int, None]: @statement_timeout_ms.setter def statement_timeout_ms(self, value: Union[int, None]): - if value is not None: - value = str(value) - self._properties["statementTimeoutMs"] = value + new_value = None if value is None else str(value) + self._properties["statementTimeoutMs"] = new_value @property def statement_byte_budget(self) -> Union[int, None]: @@ -207,9 +206,8 @@ def statement_byte_budget(self) -> Union[int, None]: @statement_byte_budget.setter def statement_byte_budget(self, value: Union[int, None]): - if value is not None: - value = str(value) - self._properties["statementByteBudget"] = value + new_value = None if value is None else str(value) + self._properties["statementByteBudget"] = new_value @property def key_result_statement(self) -> Union[KeyResultStatementKind, None]: @@ -666,9 +664,8 @@ def script_options(self) -> ScriptOptions: @script_options.setter def script_options(self, value: Union[ScriptOptions, None]): - if value is not None: - value = value.to_api_repr() - self._set_sub_prop("scriptOptions", value) + new_value = None if value is None else value.to_api_repr() + self._set_sub_prop("scriptOptions", new_value) def to_api_repr(self) -> dict: """Build an API representation of the query job config. @@ -1330,7 +1327,7 @@ def _done_or_raise(self, retry=DEFAULT_RETRY, timeout=None): except exceptions.GoogleAPIError as exc: self.set_exception(exc) - def result( + def result( # type: ignore # (complaints about the overloaded signature) self, page_size: int = None, max_results: int = None, @@ -1400,7 +1397,7 @@ def result( retry_do_query = getattr(self, "_retry_do_query", None) if retry_do_query is not None: if job_retry is DEFAULT_JOB_RETRY: - job_retry = self._job_retry + job_retry = self._job_retry # type: ignore else: if job_retry is not None and job_retry is not DEFAULT_JOB_RETRY: raise TypeError( @@ -1451,7 +1448,7 @@ def do_get_result(): except exceptions.GoogleAPICallError as exc: exc.message += self._format_for_exception(self.query, self.job_id) - exc.query_job = self + exc.query_job = self # type: ignore raise except requests.exceptions.Timeout as exc: raise concurrent.futures.TimeoutError from exc diff --git a/google/cloud/bigquery/magics/line_arg_parser/lexer.py b/google/cloud/bigquery/magics/line_arg_parser/lexer.py index cd809c389..71b287d01 100644 --- a/google/cloud/bigquery/magics/line_arg_parser/lexer.py +++ b/google/cloud/bigquery/magics/line_arg_parser/lexer.py @@ -98,7 +98,7 @@ def _generate_next_value_(name, start, count, last_values): return name -TokenType = AutoStrEnum( # pytype: disable=wrong-arg-types +TokenType = AutoStrEnum( # type: ignore # pytype: disable=wrong-arg-types "TokenType", [ (name, enum.auto()) diff --git a/google/cloud/bigquery/magics/magics.py b/google/cloud/bigquery/magics/magics.py index ec0430518..5af0a3b51 100644 --- a/google/cloud/bigquery/magics/magics.py +++ b/google/cloud/bigquery/magics/magics.py @@ -90,16 +90,16 @@ from concurrent import futures try: - import IPython - from IPython import display - from IPython.core import magic_arguments + import IPython # type: ignore + from IPython import display # type: ignore + from IPython.core import magic_arguments # type: ignore except ImportError: # pragma: NO COVER raise ImportError("This module can only be loaded in IPython.") from google.api_core import client_info from google.api_core import client_options from google.api_core.exceptions import NotFound -import google.auth +import google.auth # type: ignore from google.cloud import bigquery import google.cloud.bigquery.dataset from google.cloud.bigquery.dbapi import _helpers @@ -596,6 +596,29 @@ def _cell_magic(line, query): _handle_error(error, args.destination_var) return + # Check if query is given as a reference to a variable. + if query.startswith("$"): + query_var_name = query[1:] + + if not query_var_name: + missing_msg = 'Missing query variable name, empty "$" is not allowed.' + raise NameError(missing_msg) + + if query_var_name.isidentifier(): + ip = IPython.get_ipython() + query = ip.user_ns.get(query_var_name, ip) # ip serves as a sentinel + + if query is ip: + raise NameError( + f"Unknown query, variable {query_var_name} does not exist." + ) + else: + if not isinstance(query, (str, bytes)): + raise TypeError( + f"Query variable {query_var_name} must be a string " + "or a bytes-like value." + ) + # Any query that does not contain whitespace (aside from leading and trailing whitespace) # is assumed to be a table id if not re.search(r"\s", query): diff --git a/google/cloud/bigquery/model.py b/google/cloud/bigquery/model.py index 2d3f6660f..cdb411e08 100644 --- a/google/cloud/bigquery/model.py +++ b/google/cloud/bigquery/model.py @@ -20,8 +20,8 @@ from google.protobuf import json_format -import google.cloud._helpers -from google.api_core import datetime_helpers +import google.cloud._helpers # type: ignore +from google.api_core import datetime_helpers # type: ignore from google.cloud.bigquery import _helpers from google.cloud.bigquery_v2 import types from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration diff --git a/google/cloud/bigquery/opentelemetry_tracing.py b/google/cloud/bigquery/opentelemetry_tracing.py index 57f258ac4..748f2136d 100644 --- a/google/cloud/bigquery/opentelemetry_tracing.py +++ b/google/cloud/bigquery/opentelemetry_tracing.py @@ -14,12 +14,12 @@ import logging from contextlib import contextmanager -from google.api_core.exceptions import GoogleAPICallError +from google.api_core.exceptions import GoogleAPICallError # type: ignore logger = logging.getLogger(__name__) try: from opentelemetry import trace - from opentelemetry.instrumentation.utils import http_status_to_canonical_code + from opentelemetry.instrumentation.utils import http_status_to_status_code from opentelemetry.trace.status import Status HAS_OPENTELEMETRY = True @@ -65,9 +65,10 @@ def create_span(name, attributes=None, client=None, job_ref=None): if not _warned_telemetry: logger.debug( "This service is instrumented using OpenTelemetry. " - "OpenTelemetry could not be imported; please " - "add opentelemetry-api and opentelemetry-instrumentation " - "packages in order to get BigQuery Tracing data." + "OpenTelemetry or one of its components could not be imported; " + "please add compatible versions of opentelemetry-api and " + "opentelemetry-instrumentation packages in order to get BigQuery " + "Tracing data." ) _warned_telemetry = True @@ -81,7 +82,7 @@ def create_span(name, attributes=None, client=None, job_ref=None): yield span except GoogleAPICallError as error: if error.code is not None: - span.set_status(Status(http_status_to_canonical_code(error.code))) + span.set_status(Status(http_status_to_status_code(error.code))) raise diff --git a/google/cloud/bigquery/query.py b/google/cloud/bigquery/query.py index 708f5f47b..637be62be 100644 --- a/google/cloud/bigquery/query.py +++ b/google/cloud/bigquery/query.py @@ -367,14 +367,14 @@ class _AbstractQueryParameter(object): """ @classmethod - def from_api_repr(cls, resource: dict) -> "ScalarQueryParameter": + def from_api_repr(cls, resource: dict) -> "_AbstractQueryParameter": """Factory: construct parameter from JSON resource. Args: resource (Dict): JSON mapping of parameter Returns: - google.cloud.bigquery.query.ScalarQueryParameter + A new instance of _AbstractQueryParameter subclass. """ raise NotImplementedError @@ -471,7 +471,7 @@ def to_api_repr(self) -> dict: converter = _SCALAR_VALUE_TO_JSON_PARAM.get(self.type_) if converter is not None: value = converter(value) - resource = { + resource: Dict[str, Any] = { "parameterType": {"type": self.type_}, "parameterValue": {"value": value}, } @@ -734,7 +734,7 @@ def from_api_repr(cls, resource: dict) -> "StructQueryParameter": struct_values = resource["parameterValue"]["structValues"] for key, value in struct_values.items(): type_ = types[key] - converted = None + converted: Optional[Union[ArrayQueryParameter, StructQueryParameter]] = None if type_ == "STRUCT": struct_resource = { "name": key, diff --git a/google/cloud/bigquery/retry.py b/google/cloud/bigquery/retry.py index 8a86973cd..254b26608 100644 --- a/google/cloud/bigquery/retry.py +++ b/google/cloud/bigquery/retry.py @@ -14,7 +14,7 @@ from google.api_core import exceptions from google.api_core import retry -from google.auth import exceptions as auth_exceptions +from google.auth import exceptions as auth_exceptions # type: ignore import requests.exceptions diff --git a/google/cloud/bigquery/routine/routine.py b/google/cloud/bigquery/routine/routine.py index a776212c3..a66434300 100644 --- a/google/cloud/bigquery/routine/routine.py +++ b/google/cloud/bigquery/routine/routine.py @@ -18,7 +18,7 @@ from google.protobuf import json_format -import google.cloud._helpers +import google.cloud._helpers # type: ignore from google.cloud.bigquery import _helpers import google.cloud.bigquery_v2.types from google.cloud.bigquery_v2.types import StandardSqlTableType diff --git a/google/cloud/bigquery/schema.py b/google/cloud/bigquery/schema.py index 225942234..2af61b672 100644 --- a/google/cloud/bigquery/schema.py +++ b/google/cloud/bigquery/schema.py @@ -16,7 +16,7 @@ import collections import enum -from typing import Iterable, Union +from typing import Any, Dict, Iterable, Union from google.cloud.bigquery_v2 import types @@ -106,7 +106,7 @@ def __init__( scale: Union[int, _DefaultSentinel] = _DEFAULT_VALUE, max_length: Union[int, _DefaultSentinel] = _DEFAULT_VALUE, ): - self._properties = { + self._properties: Dict[str, Any] = { "name": name, "type": field_type, } diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 608218fdc..a0696f83f 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -21,37 +21,37 @@ import functools import operator import typing -from typing import Any, Dict, Iterable, Iterator, Optional, Tuple +from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union import warnings try: - import pandas + import pandas # type: ignore except ImportError: # pragma: NO COVER pandas = None try: - import geopandas + import geopandas # type: ignore except ImportError: geopandas = None else: _COORDINATE_REFERENCE_SYSTEM = "EPSG:4326" try: - import shapely.geos + import shapely.geos # type: ignore except ImportError: shapely = None else: _read_wkt = shapely.geos.WKTReader(shapely.geos.lgeos).read try: - import pyarrow + import pyarrow # type: ignore except ImportError: # pragma: NO COVER pyarrow = None import google.api_core.exceptions from google.api_core.page_iterator import HTTPIterator -import google.cloud._helpers +import google.cloud._helpers # type: ignore from google.cloud.bigquery import _helpers from google.cloud.bigquery import _pandas_helpers from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError @@ -130,7 +130,7 @@ def _view_use_legacy_sql_getter(table): class _TableBase: """Base class for Table-related classes with common functionality.""" - _PROPERTY_TO_API_FIELD = { + _PROPERTY_TO_API_FIELD: Dict[str, Union[str, List[str]]] = { "dataset_id": ["tableReference", "datasetId"], "project": ["tableReference", "projectId"], "table_id": ["tableReference", "tableId"], @@ -807,7 +807,7 @@ def view_query(self): view_use_legacy_sql = property(_view_use_legacy_sql_getter) - @view_use_legacy_sql.setter + @view_use_legacy_sql.setter # type: ignore # (redefinition from above) def view_use_legacy_sql(self, value): if not isinstance(value, bool): raise ValueError("Pass a boolean") @@ -1629,8 +1629,49 @@ def _to_page_iterable( ) yield from result_pages - def _to_arrow_iterable(self, bqstorage_client=None): - """Create an iterable of arrow RecordBatches, to process the table as a stream.""" + def to_arrow_iterable( + self, + bqstorage_client: "bigquery_storage.BigQueryReadClient" = None, + max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore + ) -> Iterator["pyarrow.RecordBatch"]: + """[Beta] Create an iterable of class:`pyarrow.RecordBatch`, to process the table as a stream. + + Args: + bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]): + A BigQuery Storage API client. If supplied, use the faster + BigQuery Storage API to fetch rows from BigQuery. + + This method requires the ``pyarrow`` and + ``google-cloud-bigquery-storage`` libraries. + + This method only exposes a subset of the capabilities of the + BigQuery Storage API. For full access to all features + (projections, filters, snapshots) use the Storage API directly. + + max_queue_size (Optional[int]): + The maximum number of result pages to hold in the internal queue when + streaming query results over the BigQuery Storage API. Ignored if + Storage API is not used. + + By default, the max queue size is set to the number of BQ Storage streams + created by the server. If ``max_queue_size`` is :data:`None`, the queue + size is infinite. + + Returns: + pyarrow.RecordBatch: + A generator of :class:`~pyarrow.RecordBatch`. + + Raises: + ValueError: + If the :mod:`pyarrow` library cannot be imported. + + .. versionadded:: 2.31.0 + """ + if pyarrow is None: + raise ValueError(_NO_PYARROW_ERROR) + + self._maybe_warn_max_results(bqstorage_client) + bqstorage_download = functools.partial( _pandas_helpers.download_arrow_bqstorage, self._project, @@ -1638,6 +1679,7 @@ def _to_arrow_iterable(self, bqstorage_client=None): bqstorage_client, preserve_order=self._preserve_order, selected_fields=self._selected_fields, + max_queue_size=max_queue_size, ) tabledata_list_download = functools.partial( _pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema @@ -1729,7 +1771,7 @@ def to_arrow( ) record_batches = [] - for record_batch in self._to_arrow_iterable( + for record_batch in self.to_arrow_iterable( bqstorage_client=bqstorage_client ): record_batches.append(record_batch) @@ -1746,7 +1788,7 @@ def to_arrow( progress_bar.close() finally: if owns_bqstorage_client: - bqstorage_client._transport.grpc_channel.close() + bqstorage_client._transport.grpc_channel.close() # type: ignore if record_batches and bqstorage_client is not None: return pyarrow.Table.from_batches(record_batches) @@ -1763,7 +1805,7 @@ def to_dataframe_iterable( self, bqstorage_client: "bigquery_storage.BigQueryReadClient" = None, dtypes: Dict[str, Any] = None, - max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, + max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore ) -> "pandas.DataFrame": """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -2225,6 +2267,33 @@ def to_dataframe_iterable( raise ValueError(_NO_PANDAS_ERROR) return iter((pandas.DataFrame(),)) + def to_arrow_iterable( + self, + bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, + max_queue_size: Optional[int] = None, + ) -> Iterator["pyarrow.RecordBatch"]: + """Create an iterable of pandas DataFrames, to process the table as a stream. + + .. versionadded:: 2.31.0 + + Args: + bqstorage_client: + Ignored. Added for compatibility with RowIterator. + + max_queue_size: + Ignored. Added for compatibility with RowIterator. + + Returns: + An iterator yielding a single empty :class:`~pyarrow.RecordBatch`. + + Raises: + ValueError: + If the :mod:`pyarrow` library cannot be imported. + """ + if pyarrow is None: + raise ValueError(_NO_PYARROW_ERROR) + return iter((pyarrow.record_batch([]),)) + def __iter__(self): return iter(()) @@ -2307,8 +2376,6 @@ def __repr__(self): key_vals = ["{}={}".format(key, val) for key, val in self._key()] return "PartitionRange({})".format(", ".join(key_vals)) - __hash__ = None - class RangePartitioning(object): """Range-based partitioning configuration for a table. @@ -2387,8 +2454,6 @@ def __repr__(self): key_vals = ["{}={}".format(key, repr(val)) for key, val in self._key()] return "RangePartitioning({})".format(", ".join(key_vals)) - __hash__ = None - class TimePartitioningType(object): """Specifies the type of time partitioning to perform.""" @@ -2657,7 +2722,7 @@ def _rows_page_start(iterator, page, response): # pylint: enable=unused-argument -def _table_arg_to_table_ref(value, default_project=None): +def _table_arg_to_table_ref(value, default_project=None) -> TableReference: """Helper to convert a string or Table to TableReference. This function keeps TableReference and other kinds of objects unchanged. @@ -2669,7 +2734,7 @@ def _table_arg_to_table_ref(value, default_project=None): return value -def _table_arg_to_table(value, default_project=None): +def _table_arg_to_table(value, default_project=None) -> Table: """Helper to convert a string or TableReference to a Table. This function keeps Table and other kinds of objects unchanged. diff --git a/google/cloud/bigquery/version.py b/google/cloud/bigquery/version.py index 877ea53d8..6329658af 100644 --- a/google/cloud/bigquery/version.py +++ b/google/cloud/bigquery/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "2.30.1" +__version__ = "2.31.0" diff --git a/google/cloud/bigquery_v2/types/model.py b/google/cloud/bigquery_v2/types/model.py index a56b21491..440bc0805 100644 --- a/google/cloud/bigquery_v2/types/model.py +++ b/google/cloud/bigquery_v2/types/model.py @@ -573,9 +573,11 @@ class FeatureValue(proto.Message): numerical_value (google.protobuf.wrappers_pb2.DoubleValue): The numerical feature value. This is the centroid value for this feature. + This field is a member of `oneof`_ ``value``. categorical_value (google.cloud.bigquery_v2.types.Model.ClusteringMetrics.Cluster.FeatureValue.CategoricalValue): The categorical feature value. + This field is a member of `oneof`_ ``value``. """ @@ -804,24 +806,30 @@ class EvaluationMetrics(proto.Message): regression_metrics (google.cloud.bigquery_v2.types.Model.RegressionMetrics): Populated for regression models and explicit feedback type matrix factorization models. + This field is a member of `oneof`_ ``metrics``. binary_classification_metrics (google.cloud.bigquery_v2.types.Model.BinaryClassificationMetrics): Populated for binary classification/classifier models. + This field is a member of `oneof`_ ``metrics``. multi_class_classification_metrics (google.cloud.bigquery_v2.types.Model.MultiClassClassificationMetrics): Populated for multi-class classification/classifier models. + This field is a member of `oneof`_ ``metrics``. clustering_metrics (google.cloud.bigquery_v2.types.Model.ClusteringMetrics): Populated for clustering models. + This field is a member of `oneof`_ ``metrics``. ranking_metrics (google.cloud.bigquery_v2.types.Model.RankingMetrics): Populated for implicit feedback type matrix factorization models. + This field is a member of `oneof`_ ``metrics``. arima_forecasting_metrics (google.cloud.bigquery_v2.types.Model.ArimaForecastingMetrics): Populated for ARIMA models. + This field is a member of `oneof`_ ``metrics``. """ diff --git a/google/cloud/bigquery_v2/types/standard_sql.py b/google/cloud/bigquery_v2/types/standard_sql.py index d6c133634..e10619482 100644 --- a/google/cloud/bigquery_v2/types/standard_sql.py +++ b/google/cloud/bigquery_v2/types/standard_sql.py @@ -49,10 +49,12 @@ class StandardSqlDataType(proto.Message): "INT64", "DATE", "ARRAY"). array_element_type (google.cloud.bigquery_v2.types.StandardSqlDataType): The type of the array's elements, if type_kind = "ARRAY". + This field is a member of `oneof`_ ``sub_type``. struct_type (google.cloud.bigquery_v2.types.StandardSqlStructType): The fields of this struct, in order, if type_kind = "STRUCT". + This field is a member of `oneof`_ ``sub_type``. """ diff --git a/noxfile.py b/noxfile.py index 1879a5cd8..505911861 100644 --- a/noxfile.py +++ b/noxfile.py @@ -22,6 +22,7 @@ import nox +MYPY_VERSION = "mypy==0.910" PYTYPE_VERSION = "pytype==2021.4.9" BLACK_VERSION = "black==19.10b0" BLACK_PATHS = ("docs", "google", "samples", "tests", "noxfile.py", "setup.py") @@ -41,6 +42,7 @@ "lint", "lint_setup_py", "blacken", + "mypy", "pytype", "docs", ] @@ -113,9 +115,24 @@ def unit_noextras(session): default(session, install_extras=False) +@nox.session(python=DEFAULT_PYTHON_VERSION) +def mypy(session): + """Run type checks with mypy.""" + session.install("-e", ".[all]") + session.install("ipython") + session.install(MYPY_VERSION) + + # Just install the dependencies' type info directly, since "mypy --install-types" + # might require an additional pass. + session.install( + "types-protobuf", "types-python-dateutil", "types-requests", "types-setuptools", + ) + session.run("mypy", "google/cloud") + + @nox.session(python=DEFAULT_PYTHON_VERSION) def pytype(session): - """Run type checks.""" + """Run type checks with pytype.""" # An indirect dependecy attrs==21.1.0 breaks the check, and installing a less # recent version avoids the error until a possibly better fix is found. # https://github.com/googleapis/python-bigquery/issues/655 diff --git a/setup.py b/setup.py index db69c45b1..5c0b80f7c 100644 --- a/setup.py +++ b/setup.py @@ -63,9 +63,9 @@ "bignumeric_type": pyarrow_dep, "tqdm": ["tqdm >= 4.7.4, <5.0.0dev"], "opentelemetry": [ - "opentelemetry-api >= 0.11b0", - "opentelemetry-sdk >= 0.11b0", - "opentelemetry-instrumentation >= 0.11b0", + "opentelemetry-api >= 1.1.0", + "opentelemetry-sdk >= 1.1.0", + "opentelemetry-instrumentation >= 0.20b0", ], } diff --git a/testing/constraints-3.6.txt b/testing/constraints-3.6.txt index 59913d588..f967077bc 100644 --- a/testing/constraints-3.6.txt +++ b/testing/constraints-3.6.txt @@ -11,9 +11,9 @@ google-cloud-bigquery-storage==2.0.0 google-cloud-core==1.4.1 google-resumable-media==0.6.0 grpcio==1.38.1 -opentelemetry-api==0.11b0 -opentelemetry-instrumentation==0.11b0 -opentelemetry-sdk==0.11b0 +opentelemetry-api==1.1.0 +opentelemetry-instrumentation==0.20b0 +opentelemetry-sdk==1.1.0 pandas==0.24.2 proto-plus==1.10.0 protobuf==3.12.0 diff --git a/tests/system/test_pandas.py b/tests/system/test_pandas.py index 1f43a369a..1541dd3b9 100644 --- a/tests/system/test_pandas.py +++ b/tests/system/test_pandas.py @@ -20,6 +20,7 @@ import json import io import operator +import warnings import google.api_core.retry import pkg_resources @@ -976,9 +977,17 @@ def test_to_geodataframe(bigquery_client, dataset_id): assert df["geog"][2] == wkt.loads("point(0 0)") assert isinstance(df, geopandas.GeoDataFrame) assert isinstance(df["geog"], geopandas.GeoSeries) - assert df.area[0] == 0.5 - assert pandas.isna(df.area[1]) - assert df.area[2] == 0.0 + + with warnings.catch_warnings(): + # Computing the area on a GeoDataFrame that uses a geographic Coordinate + # Reference System (CRS) produces a warning that we are not interested in. + # We do not mind if the computed area is incorrect with respect to the + # GeoDataFrame data, as long as it matches the expected "incorrect" value. + warnings.filterwarnings("ignore", category=UserWarning) + assert df.area[0] == 0.5 + assert pandas.isna(df.area[1]) + assert df.area[2] == 0.0 + assert df.crs.srs == "EPSG:4326" assert df.crs.name == "WGS 84" assert df.geog.crs.srs == "EPSG:4326" diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 11b336728..9c93765e8 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -36,16 +36,24 @@ import pandas except (ImportError, AttributeError): # pragma: NO COVER pandas = None + try: import opentelemetry - from opentelemetry import trace - from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.sdk.trace.export import SimpleExportSpanProcessor - from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( - InMemorySpanExporter, - ) -except (ImportError, AttributeError): # pragma: NO COVER +except ImportError: opentelemetry = None + +if opentelemetry is not None: + try: + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + except (ImportError, AttributeError) as exc: # pragma: NO COVER + msg = "Error importing from opentelemetry, is the installed version compatible?" + raise ImportError(msg) from exc + try: import pyarrow except (ImportError, AttributeError): # pragma: NO COVER @@ -784,9 +792,12 @@ def test_span_status_is_set(self): tracer_provider = TracerProvider() memory_exporter = InMemorySpanExporter() - span_processor = SimpleExportSpanProcessor(memory_exporter) + span_processor = SimpleSpanProcessor(memory_exporter) tracer_provider.add_span_processor(span_processor) - trace.set_tracer_provider(tracer_provider) + + # OpenTelemetry API >= 0.12b0 does not allow overriding the tracer once + # initialized, thus directly override the internal global var. + tracer_patcher = mock.patch.object(trace, "_TRACER_PROVIDER", tracer_provider) creds = _make_credentials() client = self._make_one(project=self.PROJECT, credentials=creds) @@ -797,7 +808,7 @@ def test_span_status_is_set(self): full_routine_id = "test-routine-project.test_routines.minimal_routine" routine = Routine(full_routine_id) - with pytest.raises(google.api_core.exceptions.AlreadyExists): + with pytest.raises(google.api_core.exceptions.AlreadyExists), tracer_patcher: client.create_routine(routine) span_list = memory_exporter.get_finished_spans() @@ -8224,6 +8235,22 @@ def test__do_resumable_upload_custom_project(self): assert initiation_url is not None assert "projects/custom-project" in initiation_url + def test__do_resumable_upload_custom_timeout(self): + file_obj = self._make_file_obj() + file_obj_len = len(file_obj.getvalue()) + transport = self._make_transport( + self._make_resumable_upload_responses(file_obj_len) + ) + client = self._make_client(transport) + + client._do_resumable_upload( + file_obj, self.EXPECTED_CONFIGURATION, num_retries=0, timeout=3.14 + ) + + # The timeout should be applied to all underlying calls. + for call_args in transport.request.call_args_list: + assert call_args.kwargs.get("timeout") == 3.14 + def test__do_multipart_upload(self): transport = self._make_transport([self._make_response(http.client.OK)]) client = self._make_client(transport) @@ -8431,7 +8458,7 @@ def test_upload_chunksize(client): upload.finished = False - def transmit_next_chunk(transport): + def transmit_next_chunk(transport, *args, **kwargs): upload.finished = True result = mock.MagicMock() result.json.return_value = {} diff --git a/tests/unit/test_magics.py b/tests/unit/test_magics.py index 36cbf4993..e18d04d64 100644 --- a/tests/unit/test_magics.py +++ b/tests/unit/test_magics.py @@ -584,7 +584,7 @@ def test_bigquery_magic_does_not_clear_display_in_verbose_mode(): @pytest.mark.usefixtures("ipython_interactive") -def test_bigquery_magic_clears_display_in_verbose_mode(): +def test_bigquery_magic_clears_display_in_non_verbose_mode(): ip = IPython.get_ipython() ip.extension_manager.load_extension("google.cloud.bigquery") magics.context.credentials = mock.create_autospec( @@ -1710,6 +1710,143 @@ def test_bigquery_magic_with_improperly_formatted_params(): ip.run_cell_magic("bigquery", "--params {17}", sql) +@pytest.mark.parametrize( + "raw_sql", ("SELECT answer AS 42", " \t SELECT answer AS 42 \t ") +) +@pytest.mark.usefixtures("ipython_interactive") +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_bigquery_magic_valid_query_in_existing_variable(ipython_ns_cleanup, raw_sql): + ip = IPython.get_ipython() + ip.extension_manager.load_extension("google.cloud.bigquery") + magics.context.credentials = mock.create_autospec( + google.auth.credentials.Credentials, instance=True + ) + + ipython_ns_cleanup.append((ip, "custom_query")) + ipython_ns_cleanup.append((ip, "query_results_df")) + + run_query_patch = mock.patch( + "google.cloud.bigquery.magics.magics._run_query", autospec=True + ) + query_job_mock = mock.create_autospec( + google.cloud.bigquery.job.QueryJob, instance=True + ) + mock_result = pandas.DataFrame([42], columns=["answer"]) + query_job_mock.to_dataframe.return_value = mock_result + + ip.user_ns["custom_query"] = raw_sql + cell_body = "$custom_query" # Referring to an existing variable name (custom_query) + assert "query_results_df" not in ip.user_ns + + with run_query_patch as run_query_mock: + run_query_mock.return_value = query_job_mock + + ip.run_cell_magic("bigquery", "query_results_df", cell_body) + + run_query_mock.assert_called_once_with(mock.ANY, raw_sql, mock.ANY) + + assert "query_results_df" in ip.user_ns # verify that the variable exists + df = ip.user_ns["query_results_df"] + assert len(df) == len(mock_result) # verify row count + assert list(df) == list(mock_result) # verify column names + assert list(df["answer"]) == [42] + + +@pytest.mark.usefixtures("ipython_interactive") +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_bigquery_magic_nonexisting_query_variable(): + ip = IPython.get_ipython() + ip.extension_manager.load_extension("google.cloud.bigquery") + magics.context.credentials = mock.create_autospec( + google.auth.credentials.Credentials, instance=True + ) + + run_query_patch = mock.patch( + "google.cloud.bigquery.magics.magics._run_query", autospec=True + ) + + ip.user_ns.pop("custom_query", None) # Make sure the variable does NOT exist. + cell_body = "$custom_query" # Referring to a non-existing variable name. + + with pytest.raises( + NameError, match=r".*custom_query does not exist.*" + ), run_query_patch as run_query_mock: + ip.run_cell_magic("bigquery", "", cell_body) + + run_query_mock.assert_not_called() + + +@pytest.mark.usefixtures("ipython_interactive") +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_bigquery_magic_empty_query_variable_name(): + ip = IPython.get_ipython() + ip.extension_manager.load_extension("google.cloud.bigquery") + magics.context.credentials = mock.create_autospec( + google.auth.credentials.Credentials, instance=True + ) + + run_query_patch = mock.patch( + "google.cloud.bigquery.magics.magics._run_query", autospec=True + ) + cell_body = "$" # Not referring to any variable (name omitted). + + with pytest.raises( + NameError, match=r"(?i).*missing query variable name.*" + ), run_query_patch as run_query_mock: + ip.run_cell_magic("bigquery", "", cell_body) + + run_query_mock.assert_not_called() + + +@pytest.mark.usefixtures("ipython_interactive") +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_bigquery_magic_query_variable_non_string(ipython_ns_cleanup): + ip = IPython.get_ipython() + ip.extension_manager.load_extension("google.cloud.bigquery") + magics.context.credentials = mock.create_autospec( + google.auth.credentials.Credentials, instance=True + ) + + run_query_patch = mock.patch( + "google.cloud.bigquery.magics.magics._run_query", autospec=True + ) + + ipython_ns_cleanup.append((ip, "custom_query")) + + ip.user_ns["custom_query"] = object() + cell_body = "$custom_query" # Referring to a non-string variable. + + with pytest.raises( + TypeError, match=r".*must be a string or a bytes-like.*" + ), run_query_patch as run_query_mock: + ip.run_cell_magic("bigquery", "", cell_body) + + run_query_mock.assert_not_called() + + +@pytest.mark.usefixtures("ipython_interactive") +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_bigquery_magic_query_variable_not_identifier(): + ip = IPython.get_ipython() + ip.extension_manager.load_extension("google.cloud.bigquery") + magics.context.credentials = mock.create_autospec( + google.auth.credentials.Credentials, instance=True + ) + + cell_body = "$123foo" # 123foo is not valid Python identifier + + with io.capture_output() as captured_io: + ip.run_cell_magic("bigquery", "", cell_body) + + # If "$" prefixes a string that is not a Python identifier, we do not treat such + # cell_body as a variable reference and just treat is as any other cell body input. + # If at the same time the cell body does not contain any whitespace, it is + # considered a table name, thus we expect an error that the table ID is not valid. + output = captured_io.stderr + assert "ERROR:" in output + assert "must be a fully-qualified ID" in output + + @pytest.mark.usefixtures("ipython_interactive") @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") def test_bigquery_magic_with_invalid_multiple_option_values(): diff --git a/tests/unit/test_opentelemetry_tracing.py b/tests/unit/test_opentelemetry_tracing.py index 726e3cf6f..3021a3dbf 100644 --- a/tests/unit/test_opentelemetry_tracing.py +++ b/tests/unit/test_opentelemetry_tracing.py @@ -20,14 +20,21 @@ try: import opentelemetry - from opentelemetry import trace - from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.sdk.trace.export import SimpleExportSpanProcessor - from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( - InMemorySpanExporter, - ) except ImportError: # pragma: NO COVER opentelemetry = None + +if opentelemetry is not None: + try: + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + except (ImportError, AttributeError) as exc: # pragma: NO COVER + msg = "Error importing from opentelemetry, is the installed version compatible?" + raise ImportError(msg) from exc + import pytest from google.cloud.bigquery import opentelemetry_tracing @@ -42,11 +49,18 @@ def setup(): importlib.reload(opentelemetry_tracing) tracer_provider = TracerProvider() memory_exporter = InMemorySpanExporter() - span_processor = SimpleExportSpanProcessor(memory_exporter) + span_processor = SimpleSpanProcessor(memory_exporter) tracer_provider.add_span_processor(span_processor) - trace.set_tracer_provider(tracer_provider) + + # OpenTelemetry API >= 0.12b0 does not allow overriding the tracer once + # initialized, thus directly override (and then restore) the internal global var. + orig_trace_provider = trace._TRACER_PROVIDER + trace._TRACER_PROVIDER = tracer_provider + yield memory_exporter + trace._TRACER_PROVIDER = orig_trace_provider + @pytest.mark.skipif(opentelemetry is None, reason="Require `opentelemetry`") def test_opentelemetry_not_installed(setup, monkeypatch): diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index 3c68e3c5e..4f45eac3d 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -1840,6 +1840,25 @@ def test_to_arrow(self): self.assertIsInstance(tbl, pyarrow.Table) self.assertEqual(tbl.num_rows, 0) + @mock.patch("google.cloud.bigquery.table.pyarrow", new=None) + def test_to_arrow_iterable_error_if_pyarrow_is_none(self): + row_iterator = self._make_one() + with self.assertRaises(ValueError): + row_iterator.to_arrow_iterable() + + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_to_arrow_iterable(self): + row_iterator = self._make_one() + arrow_iter = row_iterator.to_arrow_iterable() + + result = list(arrow_iter) + + self.assertEqual(len(result), 1) + record_batch = result[0] + self.assertIsInstance(record_batch, pyarrow.RecordBatch) + self.assertEqual(record_batch.num_rows, 0) + self.assertEqual(record_batch.num_columns, 0) + @mock.patch("google.cloud.bigquery.table.pandas", new=None) def test_to_dataframe_error_if_pandas_is_none(self): row_iterator = self._make_one() @@ -2151,6 +2170,205 @@ def test__validate_bqstorage_returns_false_w_warning_if_obsolete_version(self): ] assert matching_warnings, "Obsolete dependency warning not raised." + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_to_arrow_iterable(self): + from google.cloud.bigquery.schema import SchemaField + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + SchemaField( + "child", + "RECORD", + mode="REPEATED", + fields=[ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ], + ), + ] + rows = [ + { + "f": [ + {"v": "Bharney Rhubble"}, + {"v": "33"}, + { + "v": [ + {"v": {"f": [{"v": "Whamm-Whamm Rhubble"}, {"v": "3"}]}}, + {"v": {"f": [{"v": "Hoppy"}, {"v": "1"}]}}, + ] + }, + ] + }, + { + "f": [ + {"v": "Wylma Phlyntstone"}, + {"v": "29"}, + { + "v": [ + {"v": {"f": [{"v": "Bepples Phlyntstone"}, {"v": "0"}]}}, + {"v": {"f": [{"v": "Dino"}, {"v": "4"}]}}, + ] + }, + ] + }, + ] + path = "/foo" + api_request = mock.Mock( + side_effect=[ + {"rows": [rows[0]], "pageToken": "NEXTPAGE"}, + {"rows": [rows[1]]}, + ] + ) + row_iterator = self._make_one( + _mock_client(), api_request, path, schema, page_size=1, max_results=5 + ) + + record_batches = row_iterator.to_arrow_iterable() + self.assertIsInstance(record_batches, types.GeneratorType) + record_batches = list(record_batches) + self.assertEqual(len(record_batches), 2) + + # Check the schema. + for record_batch in record_batches: + self.assertIsInstance(record_batch, pyarrow.RecordBatch) + self.assertEqual(record_batch.schema[0].name, "name") + self.assertTrue(pyarrow.types.is_string(record_batch.schema[0].type)) + self.assertEqual(record_batch.schema[1].name, "age") + self.assertTrue(pyarrow.types.is_int64(record_batch.schema[1].type)) + child_field = record_batch.schema[2] + self.assertEqual(child_field.name, "child") + self.assertTrue(pyarrow.types.is_list(child_field.type)) + self.assertTrue(pyarrow.types.is_struct(child_field.type.value_type)) + self.assertEqual(child_field.type.value_type[0].name, "name") + self.assertEqual(child_field.type.value_type[1].name, "age") + + # Check the data. + record_batch_1 = record_batches[0].to_pydict() + names = record_batch_1["name"] + ages = record_batch_1["age"] + children = record_batch_1["child"] + self.assertEqual(names, ["Bharney Rhubble"]) + self.assertEqual(ages, [33]) + self.assertEqual( + children, + [ + [ + {"name": "Whamm-Whamm Rhubble", "age": 3}, + {"name": "Hoppy", "age": 1}, + ], + ], + ) + + record_batch_2 = record_batches[1].to_pydict() + names = record_batch_2["name"] + ages = record_batch_2["age"] + children = record_batch_2["child"] + self.assertEqual(names, ["Wylma Phlyntstone"]) + self.assertEqual(ages, [29]) + self.assertEqual( + children, + [[{"name": "Bepples Phlyntstone", "age": 0}, {"name": "Dino", "age": 4}]], + ) + + @mock.patch("google.cloud.bigquery.table.pyarrow", new=None) + def test_to_arrow_iterable_error_if_pyarrow_is_none(self): + from google.cloud.bigquery.schema import SchemaField + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + rows = [ + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + ] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = self._make_one(_mock_client(), api_request, path, schema) + + with pytest.raises(ValueError, match="pyarrow"): + row_iterator.to_arrow_iterable() + + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + @unittest.skipIf( + bigquery_storage is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_to_arrow_iterable_w_bqstorage(self): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + from google.cloud.bigquery_storage_v1 import reader + + bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient) + bqstorage_client._transport = mock.create_autospec( + big_query_read_grpc_transport.BigQueryReadGrpcTransport + ) + streams = [ + # Use two streams we want to check frames are read from each stream. + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, + ] + session = bigquery_storage.types.ReadSession(streams=streams) + arrow_schema = pyarrow.schema( + [ + pyarrow.field("colA", pyarrow.int64()), + # Not alphabetical to test column order. + pyarrow.field("colC", pyarrow.float64()), + pyarrow.field("colB", pyarrow.string()), + ] + ) + session.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes() + bqstorage_client.create_read_session.return_value = session + + mock_rowstream = mock.create_autospec(reader.ReadRowsStream) + bqstorage_client.read_rows.return_value = mock_rowstream + + mock_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_rowstream.rows.return_value = mock_rows + page_items = [ + pyarrow.array([1, -1]), + pyarrow.array([2.0, 4.0]), + pyarrow.array(["abc", "def"]), + ] + + expected_record_batch = pyarrow.RecordBatch.from_arrays( + page_items, schema=arrow_schema + ) + expected_num_record_batches = 3 + + mock_page = mock.create_autospec(reader.ReadRowsPage) + mock_page.to_arrow.return_value = expected_record_batch + mock_pages = (mock_page,) * expected_num_record_batches + type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) + + schema = [ + schema.SchemaField("colA", "INTEGER"), + schema.SchemaField("colC", "FLOAT"), + schema.SchemaField("colB", "STRING"), + ] + + row_iterator = mut.RowIterator( + _mock_client(), + None, # api_request: ignored + None, # path: ignored + schema, + table=mut.TableReference.from_string("proj.dset.tbl"), + selected_fields=schema, + ) + + record_batches = list( + row_iterator.to_arrow_iterable(bqstorage_client=bqstorage_client) + ) + total_record_batches = len(streams) * len(mock_pages) + self.assertEqual(len(record_batches), total_record_batches) + + for record_batch in record_batches: + # Are the record batches return as expected? + self.assertEqual(record_batch, expected_record_batch) + + # Don't close the client if it was passed in. + bqstorage_client._transport.grpc_channel.close.assert_not_called() + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_arrow(self): from google.cloud.bigquery.schema import SchemaField