|
21 | 21 |
|
22 | 22 | from cached_property import cached_property
|
23 | 23 | from google.api_core.gapic_v1.client_info import ClientInfo
|
| 24 | +from google.auth.credentials import Credentials |
24 | 25 | from google.cloud import logging as gcp_logging
|
| 26 | +from google.cloud.logging import Resource |
25 | 27 | from google.cloud.logging.handlers.transports import BackgroundThreadTransport, Transport
|
26 |
| -from google.cloud.logging.resource import Resource |
| 28 | +from google.cloud.logging_v2.services.logging_service_v2 import LoggingServiceV2Client |
| 29 | +from google.cloud.logging_v2.types import ListLogEntriesRequest, ListLogEntriesResponse |
27 | 30 |
|
28 | 31 | from airflow import version
|
29 | 32 | from airflow.models import TaskInstance
|
@@ -102,18 +105,33 @@ def __init__(
|
102 | 105 | self.task_instance_hostname = 'default-hostname'
|
103 | 106 |
|
104 | 107 | @cached_property
|
105 |
| - def _client(self) -> gcp_logging.Client: |
106 |
| - """Google Cloud Library API client""" |
| 108 | + def _credentials_and_project(self) -> Tuple[Credentials, str]: |
107 | 109 | credentials, project = get_credentials_and_project_id(
|
108 | 110 | key_path=self.gcp_key_path, scopes=self.scopes, disable_logging=True
|
109 | 111 | )
|
| 112 | + return credentials, project |
| 113 | + |
| 114 | + @property |
| 115 | + def _client(self) -> gcp_logging.Client: |
| 116 | + """The Cloud Library API client""" |
| 117 | + credentials, project = self._credentials_and_project |
110 | 118 | client = gcp_logging.Client(
|
111 | 119 | credentials=credentials,
|
112 | 120 | project=project,
|
113 | 121 | client_info=ClientInfo(client_library_version='airflow_v' + version.version),
|
114 | 122 | )
|
115 | 123 | return client
|
116 | 124 |
|
| 125 | + @property |
| 126 | + def _logging_service_client(self) -> LoggingServiceV2Client: |
| 127 | + """The Cloud logging service v2 client.""" |
| 128 | + credentials, _ = self._credentials_and_project |
| 129 | + client = LoggingServiceV2Client( |
| 130 | + credentials=credentials, |
| 131 | + client_info=ClientInfo(client_library_version='airflow_v' + version.version), |
| 132 | + ) |
| 133 | + return client |
| 134 | + |
117 | 135 | @cached_property
|
118 | 136 | def _transport(self) -> Transport:
|
119 | 137 | """Object responsible for sending data to Stackdriver"""
|
@@ -214,9 +232,10 @@ def escale_label_value(value: str) -> str:
|
214 | 232 | escaped_value = value.replace("\\", "\\\\").replace('"', '\\"')
|
215 | 233 | return f'"{escaped_value}"'
|
216 | 234 |
|
| 235 | + _, project = self._credentials_and_project |
217 | 236 | log_filters = [
|
218 | 237 | f'resource.type={escale_label_value(self.resource.type)}',
|
219 |
| - f'logName="projects/{self._client.project}/logs/{self.name}"', |
| 238 | + f'logName="projects/{project}/logs/{self.name}"', |
220 | 239 | ]
|
221 | 240 |
|
222 | 241 | for key, value in self.resource.labels.items():
|
@@ -277,17 +296,21 @@ def _read_single_logs_page(self, log_filter: str, page_token: Optional[str] = No
|
277 | 296 | :return: Downloaded logs and next page token
|
278 | 297 | :rtype: Tuple[str, str]
|
279 | 298 | """
|
280 |
| - entries = self._client.list_entries( |
281 |
| - filter_=log_filter, page_token=page_token, order_by='timestamp asc', page_size=1000 |
| 299 | + _, project = self._credentials_and_project |
| 300 | + request = ListLogEntriesRequest( |
| 301 | + resource_names=[f'projects/{project}'], |
| 302 | + filter=log_filter, |
| 303 | + page_token=page_token, |
| 304 | + order_by='timestamp asc', |
| 305 | + page_size=1000, |
282 | 306 | )
|
283 |
| - page = next(entries.pages) |
284 |
| - next_page_token = entries.next_page_token |
| 307 | + response = self._logging_service_client.list_log_entries(request=request) |
| 308 | + page: ListLogEntriesResponse = next(response.pages) |
285 | 309 | messages = []
|
286 |
| - for entry in page: |
287 |
| - if "message" in entry.payload: |
288 |
| - messages.append(entry.payload["message"]) |
289 |
| - |
290 |
| - return "\n".join(messages), next_page_token |
| 310 | + for entry in page.entries: |
| 311 | + if "message" in entry.json_payload: |
| 312 | + messages.append(entry.json_payload["message"]) |
| 313 | + return "\n".join(messages), page.next_page_token |
291 | 314 |
|
292 | 315 | @classmethod
|
293 | 316 | def _task_instance_to_labels(cls, ti: TaskInstance) -> Dict[str, str]:
|
@@ -323,7 +346,7 @@ def get_external_log_url(self, task_instance: TaskInstance, try_number: int) ->
|
323 | 346 | :return: URL to the external log collection service
|
324 | 347 | :rtype: str
|
325 | 348 | """
|
326 |
| - project_id = self._client.project |
| 349 | + _, project_id = self._credentials_and_project |
327 | 350 |
|
328 | 351 | ti_labels = self._task_instance_to_labels(task_instance)
|
329 | 352 | ti_labels[self.LABEL_TRY_NUMBER] = str(try_number)
|
|
0 commit comments