Skip to content

adding support for datasets and query arguments in connection string #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Oct 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,64 @@ By default, ``arraysize`` is set to ``5000``. ``arraysize`` is used to set the b
engine = create_engine('bigquery://project', arraysize=1000)


Adding a Default Dataset
________________________

If you want to have the ``Client`` use a default dataset, specify it as the "database" portion of the connection string.

.. code-block:: python

engine = create_engine('bigquery://project/dataset')

When using a default dataset, don't include the dataset name in the table name, e.g.:

.. code-block:: python

table = Table('table_name')

Note that specyfing a default dataset doesn't restrict execution of queries to that particular dataset when using raw queries, e.g.:

.. code-block:: python

# Set default dataset to dataset_a
engine = create_engine('bigquery://project/dataset_a')

# This will still execute and return rows from dataset_b
engine.execute('SELECT * FROM dataset_b.table').fetchall()


Connection String Parameters
____________________________

There are many situations where you can't call ``create_engine`` directly, such as when using tools like `Flask SQLAlchemy <http://flask-sqlalchemy.pocoo.org/2.3/>`_. For situations like these, or for situations where you want the ``Client`` to have a `default_query_job_config <https://googlecloudplatform.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.client.Client.html#google.cloud.bigquery.client.Client>`_, you can pass many arguments in the query of the connection string.

The ``credentials_path``, ``location``, and ``arraysize`` parameters are used by this library, and the rest are used to create a `QueryJobConfig <https://googlecloudplatform.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig>`_

Note that if you want to use query strings, it will be more reliable if you use three slashes, so ``'bigquery:///?a=b'`` will work reliably, but ``'bigquery://?a=b'`` might be interpreted as having a "database" of ``?a=b``, depending on the system being used to parse the connection string.

Here are examples of all the supported arguments. Any not present are either for legacy sql (which isn't supported by this library), or are too complex and are not implemented.

.. code-block:: python

engine = create_engine(
'bigquery://some-project/some-dataset' '?'
'credentials_path=/some/path/to.json' '&'
'location=some-location' '&'
'arraysize=1000' '&'
'clustering_fields=a,b,c' '&'
'create_disposition=CREATE_IF_NEEDED' '&'
'destination=different-project.different-dataset.table' '&'
'destination_encryption_configuration=some-configuration' '&'
'dry_run=true' '&'
'labels=a:b,c:d' '&'
'maximum_bytes_billed=1000' '&'
'priority=INTERACTIVE' '&'
'schema_update_options=ALLOW_FIELD_ADDITION,ALLOW_FIELD_RELAXATION' '&'
'use_query_cache=true' '&'
'write_disposition=WRITE_APPEND'
)


Requirements
============

Expand Down
2 changes: 1 addition & 1 deletion dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
sqlalchemy>=1.1.9
google-cloud-bigquery>=1.5.0
google-cloud-bigquery>=1.6.0
future==0.16.0

pytest==3.2.2
Expand Down
172 changes: 172 additions & 0 deletions pybigquery/parse_url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import re
GROUP_DELIMITER = re.compile(r'\s*\,\s*')
KEY_VALUE_DELIMITER = re.compile(r'\s*\:\s*')

from google.cloud.bigquery import QueryJobConfig
from google.cloud.bigquery.job import CreateDisposition, WriteDisposition, QueryPriority, SchemaUpdateOption

from google.cloud.bigquery.table import EncryptionConfiguration, TableReference
from google.cloud.bigquery.dataset import DatasetReference

def parse_boolean(bool_string):
bool_string = bool_string.lower()
if bool_string == 'true':
return True
elif bool_string == 'false':
return False
else:
raise ValueError()

def parse_url(url):
query = url.query

# use_legacy_sql (legacy)
if 'use_legacy_sql' in query: raise ValueError("legacy sql is not supported by this dialect")
# allow_large_results (legacy)
if 'allow_large_results' in query: raise ValueError("allow_large_results is only allowed for legacy sql, which is not supported by this dialect")
# flatten_results (legacy)
if 'flatten_results' in query: raise ValueError("flatten_results is only allowed for legacy sql, which is not supported by this dialect")
# maximum_billing_tier (deprecated)
if 'maximum_billing_tier' in query: raise ValueError("maximum_billing_tier is a deprecated argument")

location = None
dataset_id = url.database or None
arraysize = None
credentials_path = None

# location
if 'location' in query:
location = query.pop('location')

# credentials_path
if 'credentials_path' in query:
credentials_path = query.pop('credentials_path')

# arraysize
if 'arraysize' in query:
str_arraysize = query.pop('arraysize')
try:
arraysize = int(str_arraysize)
except ValueError:
raise ValueError("invalid int in url query arraysize: " + str_arraysize)

# if only these "non-config" values were present, the dict will now be empty
if not query:
# if a dataset_id exists, we need to return a job_config that isn't None
# so it can be updated with a dataset reference from the client
if dataset_id:
return location, dataset_id, arraysize, credentials_path, QueryJobConfig()
else:
return location, dataset_id, arraysize, credentials_path, None

job_config = QueryJobConfig()

# clustering_fields list(str)
if 'clustering_fields' in query:
clustering_fields = GROUP_DELIMITER.split(query['clustering_fields'])
job_config.clustering_fields = list(clustering_fields)

# create_disposition
if 'create_disposition' in query:
create_disposition = query['create_disposition']
try:
job_config.create_disposition = getattr(CreateDisposition, create_disposition)
except AttributeError:
raise ValueError("invalid create_disposition in url query: " + create_disposition)

# default_dataset
if 'default_dataset' in query or 'dataset_id' in query or 'project_id' in query:
raise ValueError("don't pass default_dataset, dataset_id, project_id in url query, instead use the url host and database")

# destination
if 'destination' in query:
dest_project = None
dest_dataset = None
dest_table = None

try:
dest_project, dest_dataset, dest_table = query['destination'].split('.')
except ValueError:
raise ValueError("url query destination parameter should be fully qualified with project, dataset, and table")

job_config.destination = TableReference(DatasetReference(dest_project, dest_dataset), dest_table)

# destination_encryption_configuration
if 'destination_encryption_configuration' in query:
job_config.destination_encryption_configuration = EncryptionConfiguration(query['destination_encryption_configuration'])

# dry_run
if 'dry_run' in query:
try:
job_config.dry_run = parse_boolean(query['dry_run'])
except ValueError:
raise ValueError("invalid boolean in url query for dry_run: " + query['dry_run'])

# labels
if 'labels' in query:
label_groups = GROUP_DELIMITER.split(query['labels'])
labels = {}
for label_group in label_groups:
try:
key, value = KEY_VALUE_DELIMITER.split(label_group)
except ValueError:
raise ValueError("malformed url query in labels: " + label_group)
labels[key] = value

job_config.labels = labels

# maximum_bytes_billed
if 'maximum_bytes_billed' in query:
try:
job_config.maximum_bytes_billed = int(query['maximum_bytes_billed'])
except ValueError:
raise ValueError("invalid int in url query maximum_bytes_billed: " + query['maximum_bytes_billed'])

# priority
if 'priority' in query:
try:
job_config.priority = getattr(QueryPriority, query['priority'])
except AttributeError:
raise ValueError("invalid priority in url query: " + query['priority'])

# query_parameters
if 'query_parameters' in query:
raise NotImplementedError("url query query_parameters not implemented")

# schema_update_options
if 'schema_update_options' in query:
schema_update_options = GROUP_DELIMITER.split(query['schema_update_options'])
try:
job_config.schema_update_options = [
getattr(SchemaUpdateOption, schema_update_option) for schema_update_option in schema_update_options
]
except AttributeError:
raise ValueError("invalid schema_update_options in url query: " + query['schema_update_options'])

# table_definitions
if 'table_definitions' in query:
raise NotImplementedError("url query table_definitions not implemented")

# time_partitioning
if 'time_partitioning' in query:
raise NotImplementedError("url query time_partitioning not implemented")

# udf_resources
if 'udf_resources' in query:
raise NotImplementedError("url query udf_resources not implemented")

# use_query_cache
if 'use_query_cache' in query:
try:
job_config.use_query_cache = parse_boolean(query['use_query_cache'])
except ValueError:
raise ValueError("invalid boolean in url query for use_query_cache: " + query['use_query_cache'])

# write_disposition
if 'write_disposition' in query:
try:
job_config.write_disposition = getattr(WriteDisposition, query['write_disposition'])
except AttributeError:
raise ValueError("invalid write_disposition in url query: " + query['write_disposition'])

return location, dataset_id, arraysize, credentials_path, job_config
68 changes: 55 additions & 13 deletions pybigquery/sqlalchemy_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from __future__ import absolute_import
from __future__ import unicode_literals

from google.cloud.bigquery import dbapi
from google.cloud.bigquery.schema import SchemaField
from google.cloud import bigquery
from google.cloud.bigquery import dbapi, QueryJobConfig
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import EncryptionConfiguration
from google.cloud.bigquery.dataset import DatasetReference
from google.oauth2 import service_account
from google.api_core.exceptions import NotFound
from sqlalchemy.exc import NoSuchTableError
Expand All @@ -17,8 +19,9 @@
from sqlalchemy.sql import elements
import re

from .parse_url import parse_url

FIELD_ILLEGAL_CHARACTERS = re.compile('[^\w]+')
FIELD_ILLEGAL_CHARACTERS = re.compile(r'[^\w]+')


class UniversalSet(object):
Expand Down Expand Up @@ -87,7 +90,6 @@ def format_label(self, label, name=None):
result = self.quote(name)
return result


_type_map = {
'STRING': types.String,
'BOOLEAN': types.Boolean,
Expand Down Expand Up @@ -207,25 +209,47 @@ def __init__(
self.credentials_path = credentials_path
self.credentials_info = credentials_info
self.location = location
self.dataset_id = None

@classmethod
def dbapi(cls):
return dbapi

def create_connect_args(self, url):
location, dataset_id, arraysize, credentials_path, default_query_job_config = parse_url(url)

self.arraysize = self.arraysize or arraysize
self.location = location or self.location
self.credentials_path = credentials_path or self.credentials_path
self.dataset_id = dataset_id

if self.credentials_path:
client = bigquery.Client.from_service_account_json(
self.credentials_path, location=self.location)
self.credentials_path,
location=self.location,
default_query_job_config=default_query_job_config
)
elif self.credentials_info:
credentials = service_account.Credentials.from_service_account_info(
self.credentials_info)
self.credentials_info
)
client = bigquery.Client(
project=self.credentials_info.get('project_id'),
credentials=credentials,
location=self.location,
project=self.credentials_info.get('project_id'),
default_query_job_config=default_query_job_config,
)
else:
client = bigquery.Client(url.host, location=self.location)
client = bigquery.Client(
project=url.host,
location=self.location,
default_query_job_config=default_query_job_config
)

# if dataset_id is set, then we know the job_config isn't None
if dataset_id:
default_query_job_config.default_dataset = client.dataset(dataset_id)

return ([client], {})

def _json_deserializer(self, row):
Expand Down Expand Up @@ -254,10 +278,17 @@ def _get_table(self, connection, table_name, schema=None):
if isinstance(connection, Engine):
connection = connection.connect()

project, dataset, table_name_prepared = self._split_table_name(table_name)
if dataset is None and schema is not None:
dataset = schema
table_name_prepared = dataset = project = None
if self.dataset_id:
table_name_prepared = table_name
dataset = self.dataset_id
project = None

else:
project, dataset, table_name_prepared = self._split_table_name(table_name)
if dataset is None and schema is not None:
table_name_prepared = table_name
dataset = schema

table = connection.connection._client.dataset(dataset, project=project).table(table_name_prepared)
try:
Expand Down Expand Up @@ -328,7 +359,10 @@ def get_schema_names(self, connection, **kw):
connection = connection.connect()

datasets = connection.connection._client.list_datasets()
return [d.dataset_id for d in datasets]
if self.dataset_id is not None:
return [d.dataset_id for d in datasets if d.dataset_id == self.dataset_id]
else:
return [d.dataset_id for d in datasets]

def get_table_names(self, connection, schema=None, **kw):
if isinstance(connection, Engine):
Expand All @@ -339,9 +373,17 @@ def get_table_names(self, connection, schema=None, **kw):
for d in datasets:
if schema is not None and d.dataset_id != schema:
continue

if self.dataset_id is not None and d.dataset_id != self.dataset_id:
continue

tables = connection.connection._client.list_tables(d.reference)
for t in tables:
result.append(d.dataset_id + '.' + t.table_id)
if self.dataset_id is None:
table_name = d.dataset_id + '.' + t.table_id
else:
table_name = t.table_id
result.append(table_name)
return result

def do_rollback(self, dbapi_connection):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def readme():
],
install_requires=[
'sqlalchemy>=1.1.9',
'google-cloud-bigquery>=1.5.0',
'google-cloud-bigquery>=1.6.0',
'future',
],
tests_require=[
Expand Down
Loading