|
| 1 | +# Copyright 2021 Google LLC All rights reserved. |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | +"""Classes for representing bundles for the Google Cloud Firestore API.""" |
| 16 | + |
| 17 | +import datetime |
| 18 | +import json |
| 19 | + |
| 20 | +from google.cloud.firestore_bundle.types.bundle import ( |
| 21 | + BundledDocumentMetadata, |
| 22 | + BundledQuery, |
| 23 | + BundleElement, |
| 24 | + BundleMetadata, |
| 25 | + NamedQuery, |
| 26 | +) |
| 27 | +from google.cloud._helpers import _datetime_to_pb_timestamp, UTC # type: ignore |
| 28 | +from google.cloud.firestore_bundle._helpers import limit_type_of_query |
| 29 | +from google.cloud.firestore_v1.async_query import AsyncQuery |
| 30 | +from google.cloud.firestore_v1.base_client import BaseClient |
| 31 | +from google.cloud.firestore_v1.base_document import DocumentSnapshot |
| 32 | +from google.cloud.firestore_v1.base_query import BaseQuery |
| 33 | +from google.cloud.firestore_v1.document import DocumentReference |
| 34 | +from google.cloud.firestore_v1 import _helpers |
| 35 | +from google.protobuf.timestamp_pb2 import Timestamp # type: ignore |
| 36 | +from google.protobuf import json_format # type: ignore |
| 37 | +from typing import ( |
| 38 | + Dict, |
| 39 | + List, |
| 40 | + Optional, |
| 41 | + Union, |
| 42 | +) |
| 43 | + |
| 44 | + |
| 45 | +class FirestoreBundle: |
| 46 | + """A group of serialized documents and queries, suitable for |
| 47 | + longterm storage or query resumption. |
| 48 | +
|
| 49 | + If any queries are added to this bundle, all associated documents will be |
| 50 | + loaded and stored in memory for serialization. |
| 51 | +
|
| 52 | + Usage: |
| 53 | +
|
| 54 | + from google.cloud.firestore import Client |
| 55 | + from google.cloud.firestore_bundle import FirestoreBundle |
| 56 | + from google.cloud.firestore import _helpers |
| 57 | +
|
| 58 | + db = Client() |
| 59 | + bundle = FirestoreBundle('my-bundle') |
| 60 | + bundle.add_named_query('all-users', db.collection('users')._query()) |
| 61 | + bundle.add_named_query( |
| 62 | + 'top-ten-hamburgers', |
| 63 | + db.collection('hamburgers').limit(limit=10)._query(), |
| 64 | + ) |
| 65 | + serialized: str = bundle.build() |
| 66 | +
|
| 67 | + # Store somewhere like your GCS for retrieval by a client SDK. |
| 68 | +
|
| 69 | + Args: |
| 70 | + name (str): The Id of the bundle. |
| 71 | + """ |
| 72 | + |
| 73 | + BUNDLE_SCHEMA_VERSION: int = 1 |
| 74 | + |
| 75 | + def __init__(self, name: str) -> None: |
| 76 | + self.name: str = name |
| 77 | + self.documents: Dict[str, "_BundledDocument"] = {} |
| 78 | + self.named_queries: Dict[str, NamedQuery] = {} |
| 79 | + self.latest_read_time: Timestamp = Timestamp(seconds=0, nanos=0) |
| 80 | + self._deserialized_metadata: Optional[BundledDocumentMetadata] = None |
| 81 | + |
| 82 | + def add_document(self, snapshot: DocumentSnapshot) -> "FirestoreBundle": |
| 83 | + """Adds a document to the bundle. |
| 84 | +
|
| 85 | + Args: |
| 86 | + snapshot (DocumentSnapshot): The fully-loaded Firestore document to |
| 87 | + be preserved. |
| 88 | +
|
| 89 | + Example: |
| 90 | +
|
| 91 | + from google.cloud import firestore |
| 92 | +
|
| 93 | + db = firestore.Client() |
| 94 | + collection_ref = db.collection(u'users') |
| 95 | +
|
| 96 | + bundle = firestore.FirestoreBundle('my bundle') |
| 97 | + bundle.add_document(collection_ref.documents('some_id').get()) |
| 98 | +
|
| 99 | + Returns: |
| 100 | + FirestoreBundle: self |
| 101 | + """ |
| 102 | + original_document: Optional[_BundledDocument] |
| 103 | + original_queries: Optional[List[str]] = [] |
| 104 | + full_document_path: str = snapshot.reference._document_path |
| 105 | + |
| 106 | + original_document = self.documents.get(full_document_path) |
| 107 | + if original_document: |
| 108 | + original_queries = original_document.metadata.queries # type: ignore |
| 109 | + |
| 110 | + should_use_snaphot: bool = ( |
| 111 | + original_document is None |
| 112 | + # equivalent to: |
| 113 | + # `if snapshot.read_time > original_document.snapshot.read_time` |
| 114 | + or _helpers.compare_timestamps( |
| 115 | + snapshot.read_time, original_document.snapshot.read_time, |
| 116 | + ) |
| 117 | + >= 0 |
| 118 | + ) |
| 119 | + |
| 120 | + if should_use_snaphot: |
| 121 | + self.documents[full_document_path] = _BundledDocument( |
| 122 | + snapshot=snapshot, |
| 123 | + metadata=BundledDocumentMetadata( |
| 124 | + name=full_document_path, |
| 125 | + read_time=snapshot.read_time, |
| 126 | + exists=snapshot.exists, |
| 127 | + queries=original_queries, |
| 128 | + ), |
| 129 | + ) |
| 130 | + |
| 131 | + self._update_last_read_time(snapshot.read_time) |
| 132 | + self._reset_metadata() |
| 133 | + return self |
| 134 | + |
| 135 | + def add_named_query(self, name: str, query: BaseQuery) -> "FirestoreBundle": |
| 136 | + """Adds a query to the bundle, referenced by the provided name. |
| 137 | +
|
| 138 | + Args: |
| 139 | + name (str): The name by which the provided query should be referenced. |
| 140 | + query (Query): Query of documents to be fully loaded and stored in |
| 141 | + the bundle for future access. |
| 142 | +
|
| 143 | + Example: |
| 144 | +
|
| 145 | + from google.cloud import firestore |
| 146 | +
|
| 147 | + db = firestore.Client() |
| 148 | + collection_ref = db.collection(u'users') |
| 149 | +
|
| 150 | + bundle = firestore.FirestoreBundle('my bundle') |
| 151 | + bundle.add_named_query('all the users', collection_ref._query()) |
| 152 | +
|
| 153 | + Returns: |
| 154 | + FirestoreBundle: self |
| 155 | +
|
| 156 | + Raises: |
| 157 | + ValueError: If anything other than a BaseQuery (e.g., a Collection) |
| 158 | + is supplied. If you have a Collection, call its `_query()` |
| 159 | + method to get what this method expects. |
| 160 | + ValueError: If the supplied name has already been added. |
| 161 | + """ |
| 162 | + if not isinstance(query, BaseQuery): |
| 163 | + raise ValueError( |
| 164 | + "Attempted to add named query of type: " |
| 165 | + f"{type(query).__name__}. Expected BaseQuery.", |
| 166 | + ) |
| 167 | + |
| 168 | + if name in self.named_queries: |
| 169 | + raise ValueError(f"Query name conflict: {name} has already been added.") |
| 170 | + |
| 171 | + # Execute the query and save each resulting document |
| 172 | + _read_time = self._save_documents_from_query(query, query_name=name) |
| 173 | + |
| 174 | + # Actually save the query to our local object cache |
| 175 | + self._save_named_query(name, query, _read_time) |
| 176 | + self._reset_metadata() |
| 177 | + return self |
| 178 | + |
| 179 | + def _save_documents_from_query( |
| 180 | + self, query: BaseQuery, query_name: str |
| 181 | + ) -> datetime.datetime: |
| 182 | + _read_time = datetime.datetime.min.replace(tzinfo=UTC) |
| 183 | + if isinstance(query, AsyncQuery): |
| 184 | + import asyncio |
| 185 | + |
| 186 | + loop = asyncio.get_event_loop() |
| 187 | + return loop.run_until_complete(self._process_async_query(query, query_name)) |
| 188 | + |
| 189 | + # `query` is now known to be a non-async `BaseQuery` |
| 190 | + doc: DocumentSnapshot |
| 191 | + for doc in query.stream(): # type: ignore |
| 192 | + self.add_document(doc) |
| 193 | + bundled_document = self.documents.get(doc.reference._document_path) |
| 194 | + bundled_document.metadata.queries.append(query_name) # type: ignore |
| 195 | + _read_time = doc.read_time |
| 196 | + return _read_time |
| 197 | + |
| 198 | + def _save_named_query( |
| 199 | + self, name: str, query: BaseQuery, read_time: datetime.datetime, |
| 200 | + ) -> None: |
| 201 | + self.named_queries[name] = self._build_named_query( |
| 202 | + name=name, snapshot=query, read_time=read_time, |
| 203 | + ) |
| 204 | + self._update_last_read_time(read_time) |
| 205 | + |
| 206 | + async def _process_async_query( |
| 207 | + self, snapshot: AsyncQuery, query_name: str, |
| 208 | + ) -> datetime.datetime: |
| 209 | + doc: DocumentSnapshot |
| 210 | + _read_time = datetime.datetime.min.replace(tzinfo=UTC) |
| 211 | + async for doc in snapshot.stream(): |
| 212 | + self.add_document(doc) |
| 213 | + bundled_document = self.documents.get(doc.reference._document_path) |
| 214 | + bundled_document.metadata.queries.append(query_name) # type: ignore |
| 215 | + _read_time = doc.read_time |
| 216 | + return _read_time |
| 217 | + |
| 218 | + def _build_named_query( |
| 219 | + self, name: str, snapshot: BaseQuery, read_time: datetime.datetime, |
| 220 | + ) -> NamedQuery: |
| 221 | + return NamedQuery( |
| 222 | + name=name, |
| 223 | + bundled_query=BundledQuery( |
| 224 | + parent=name, |
| 225 | + structured_query=snapshot._to_protobuf()._pb, |
| 226 | + limit_type=limit_type_of_query(snapshot), |
| 227 | + ), |
| 228 | + read_time=_helpers.build_timestamp(read_time), |
| 229 | + ) |
| 230 | + |
| 231 | + def _update_last_read_time( |
| 232 | + self, read_time: Union[datetime.datetime, Timestamp] |
| 233 | + ) -> None: |
| 234 | + _ts: Timestamp = ( |
| 235 | + read_time |
| 236 | + if isinstance(read_time, Timestamp) |
| 237 | + else _datetime_to_pb_timestamp(read_time) |
| 238 | + ) |
| 239 | + |
| 240 | + # if `_ts` is greater than `self.latest_read_time` |
| 241 | + if _helpers.compare_timestamps(_ts, self.latest_read_time) == 1: |
| 242 | + self.latest_read_time = _ts |
| 243 | + |
| 244 | + def _add_bundle_element(self, bundle_element: BundleElement, *, client: BaseClient, type: str): # type: ignore |
| 245 | + """Applies BundleElements to this FirestoreBundle instance as a part of |
| 246 | + deserializing a FirestoreBundle string. |
| 247 | + """ |
| 248 | + from google.cloud.firestore_v1.types.document import Document |
| 249 | + |
| 250 | + if getattr(self, "_doc_metadata_map", None) is None: |
| 251 | + self._doc_metadata_map = {} |
| 252 | + if type == "metadata": |
| 253 | + self._deserialized_metadata = bundle_element.metadata # type: ignore |
| 254 | + elif type == "namedQuery": |
| 255 | + self.named_queries[bundle_element.named_query.name] = bundle_element.named_query # type: ignore |
| 256 | + elif type == "documentMetadata": |
| 257 | + self._doc_metadata_map[ |
| 258 | + bundle_element.document_metadata.name |
| 259 | + ] = bundle_element.document_metadata |
| 260 | + elif type == "document": |
| 261 | + doc_ref_value = _helpers.DocumentReferenceValue( |
| 262 | + bundle_element.document.name |
| 263 | + ) |
| 264 | + snapshot = DocumentSnapshot( |
| 265 | + data=_helpers.decode_dict( |
| 266 | + Document(mapping=bundle_element.document).fields, client |
| 267 | + ), |
| 268 | + exists=True, |
| 269 | + reference=DocumentReference( |
| 270 | + doc_ref_value.collection_name, |
| 271 | + doc_ref_value.document_id, |
| 272 | + client=client, |
| 273 | + ), |
| 274 | + read_time=self._doc_metadata_map[ |
| 275 | + bundle_element.document.name |
| 276 | + ].read_time, |
| 277 | + create_time=bundle_element.document.create_time, # type: ignore |
| 278 | + update_time=bundle_element.document.update_time, # type: ignore |
| 279 | + ) |
| 280 | + self.add_document(snapshot) |
| 281 | + |
| 282 | + bundled_document = self.documents.get(snapshot.reference._document_path) |
| 283 | + for query_name in self._doc_metadata_map[ |
| 284 | + bundle_element.document.name |
| 285 | + ].queries: |
| 286 | + bundled_document.metadata.queries.append(query_name) # type: ignore |
| 287 | + else: |
| 288 | + raise ValueError(f"Unexpected type of BundleElement: {type}") |
| 289 | + |
| 290 | + def build(self) -> str: |
| 291 | + """Iterates over the bundle's stored documents and queries and produces |
| 292 | + a single length-prefixed json string suitable for long-term storage. |
| 293 | +
|
| 294 | + Example: |
| 295 | +
|
| 296 | + from google.cloud import firestore |
| 297 | +
|
| 298 | + db = firestore.Client() |
| 299 | + collection_ref = db.collection(u'users') |
| 300 | +
|
| 301 | + bundle = firestore.FirestoreBundle('my bundle') |
| 302 | + bundle.add_named_query('app-users', collection_ref._query()) |
| 303 | +
|
| 304 | + serialized_bundle: str = bundle.build() |
| 305 | +
|
| 306 | + # Now upload `serialized_bundle` to Google Cloud Storage, store it |
| 307 | + # in Memorystore, or any other storage solution. |
| 308 | +
|
| 309 | + Returns: |
| 310 | + str: The length-prefixed string representation of this bundle' |
| 311 | + contents. |
| 312 | + """ |
| 313 | + buffer: str = "" |
| 314 | + |
| 315 | + named_query: NamedQuery |
| 316 | + for named_query in self.named_queries.values(): |
| 317 | + buffer += self._compile_bundle_element( |
| 318 | + BundleElement(named_query=named_query) |
| 319 | + ) |
| 320 | + |
| 321 | + bundled_document: "_BundledDocument" # type: ignore |
| 322 | + document_count: int = 0 |
| 323 | + for bundled_document in self.documents.values(): |
| 324 | + buffer += self._compile_bundle_element( |
| 325 | + BundleElement(document_metadata=bundled_document.metadata) |
| 326 | + ) |
| 327 | + document_count += 1 |
| 328 | + buffer += self._compile_bundle_element( |
| 329 | + BundleElement(document=bundled_document.snapshot._to_protobuf()._pb,) |
| 330 | + ) |
| 331 | + |
| 332 | + metadata: BundleElement = BundleElement( |
| 333 | + metadata=self._deserialized_metadata |
| 334 | + or BundleMetadata( |
| 335 | + id=self.name, |
| 336 | + create_time=_helpers.build_timestamp(), |
| 337 | + version=FirestoreBundle.BUNDLE_SCHEMA_VERSION, |
| 338 | + total_documents=document_count, |
| 339 | + total_bytes=len(buffer.encode("utf-8")), |
| 340 | + ) |
| 341 | + ) |
| 342 | + return f"{self._compile_bundle_element(metadata)}{buffer}" |
| 343 | + |
| 344 | + def _compile_bundle_element(self, bundle_element: BundleElement) -> str: |
| 345 | + serialized_be = json.dumps(json_format.MessageToDict(bundle_element._pb)) |
| 346 | + return f"{len(serialized_be)}{serialized_be}" |
| 347 | + |
| 348 | + def _reset_metadata(self): |
| 349 | + """Hydrating bundles stores cached data we must reset anytime new |
| 350 | + queries or documents are added""" |
| 351 | + self._deserialized_metadata = None |
| 352 | + |
| 353 | + |
| 354 | +class _BundledDocument: |
| 355 | + """Convenience class to hold both the metadata and the actual content |
| 356 | + of a document to be bundled.""" |
| 357 | + |
| 358 | + def __init__( |
| 359 | + self, snapshot: DocumentSnapshot, metadata: BundledDocumentMetadata, |
| 360 | + ) -> None: |
| 361 | + self.snapshot = snapshot |
| 362 | + self.metadata = metadata |
0 commit comments