Skip to content

Commit c7072c0

Browse files
bkossakowskaBeata Kossakowskaeladkal
authored
Fix BIGQUERY_JOB_DETAILS_LINK_FMT in BigQueryConsoleLink (#31457)
Co-authored-by: Beata Kossakowska <[email protected]> Co-authored-by: eladkal <[email protected]>
1 parent 6673a40 commit c7072c0

File tree

2 files changed

+77
-30
lines changed

2 files changed

+77
-30
lines changed

airflow/providers/google/cloud/operators/bigquery.py

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
from airflow.models.taskinstancekey import TaskInstanceKey
5959
from airflow.utils.context import Context
6060

61-
BIGQUERY_JOB_DETAILS_LINK_FMT = "https://console.cloud.google.com/bigquery?j={job_id}"
61+
BIGQUERY_JOB_DETAILS_LINK_FMT = "https://console.cloud.google.com/bigquery?j={project_id}:{location}:{job_id}"
6262

6363

6464
class BigQueryUIColors(enum.Enum):
@@ -90,8 +90,17 @@ def get_link(
9090
*,
9191
ti_key: TaskInstanceKey,
9292
):
93-
job_id = XCom.get_value(key="job_id", ti_key=ti_key)
94-
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ""
93+
job_id_params = XCom.get_value(key="job_id_params", ti_key=ti_key)
94+
95+
return (
96+
BIGQUERY_JOB_DETAILS_LINK_FMT.format(
97+
job_id=job_id_params["job_id"],
98+
project_id=job_id_params["project_id"],
99+
location=job_id_params["location"],
100+
)
101+
if job_id_params
102+
else ""
103+
)
95104

96105

97106
@attr.s(auto_attribs=True)
@@ -110,13 +119,16 @@ def get_link(
110119
*,
111120
ti_key: TaskInstanceKey,
112121
):
113-
job_ids = XCom.get_value(key="job_id", ti_key=ti_key)
122+
job_ids_params = XCom.get_value(key="job_id_params", ti_key=ti_key)
123+
job_ids = job_ids_params["job_id"]
114124
if not job_ids:
115125
return None
116126
if len(job_ids) < self.index:
117127
return None
118128
job_id = job_ids[self.index]
119-
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id)
129+
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(
130+
job_id=job_id, project_id=job_ids_params["project_id"], location=job_ids_params["location"]
131+
)
120132

121133

122134
class _BigQueryDbHookMixin:
@@ -1184,7 +1196,13 @@ def execute(self, context: Context):
11841196
]
11851197
else:
11861198
raise AirflowException(f"argument 'sql' of type {type(str)} is neither a string nor an iterable")
1187-
context["task_instance"].xcom_push(key="job_id", value=job_id)
1199+
job_id_params = {
1200+
"job_id": job_id,
1201+
"project_id": self.hook.project_id,
1202+
"location": self.location if self.location else "US",
1203+
}
1204+
context["task_instance"].xcom_push(key="job_id_params", value=job_id_params)
1205+
return job_id
11881206

11891207
def on_kill(self) -> None:
11901208
super().on_kill()
@@ -2727,9 +2745,13 @@ def execute(self, context: Any):
27272745
persist_kwargs["dataset_id"] = table["datasetId"]
27282746
persist_kwargs["project_id"] = table["projectId"]
27292747
BigQueryTableLink.persist(**persist_kwargs)
2730-
27312748
self.job_id = job.job_id
2732-
context["ti"].xcom_push(key="job_id", value=self.job_id)
2749+
job_id_params = {
2750+
"job_id": job_id,
2751+
"project_id": self.project_id or self.hook.project_id,
2752+
"location": self.location if self.location else "US",
2753+
}
2754+
context["ti"].xcom_push(key="job_id_params", value=job_id_params)
27332755
# Wait for the job to complete
27342756
if not self.deferrable:
27352757
job.result(timeout=self.result_timeout, retry=self.result_retry)

tests/providers/google/cloud/operators/test_bigquery.py

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@
8383
}
8484
TEST_TABLE = "test-table"
8585
GCP_CONN_ID = "google_cloud_default"
86+
TEST_JOB_ID_1 = "test-job-id"
87+
TEST_JOB_ID_2 = "test-123"
88+
TEST_FULL_JOB_ID = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_1}"
89+
TEST_FULL_JOB_ID_2 = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_2}"
8690

8791

8892
class TestBigQueryCreateEmptyTableOperator:
@@ -672,11 +676,15 @@ def test_bigquery_operator_extra_serialized_field_when_single_query(
672676

673677
# Check DeSerialized version of operator link
674678
assert isinstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleLink)
675-
676-
ti.xcom_push("job_id", 12345)
679+
test_job_id_params = {
680+
"job_id": TEST_JOB_ID_1,
681+
"project_id": TEST_GCP_PROJECT_ID,
682+
"location": TEST_DATASET_LOCATION,
683+
}
684+
ti.xcom_push("job_id_params", test_job_id_params)
677685

678686
url = simple_task.get_extra_links(ti, BigQueryConsoleLink.name)
679-
assert url == "https://console.cloud.google.com/bigquery?j=12345"
687+
assert url == f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
680688

681689
@pytest.mark.need_serialized_dag
682690
def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
@@ -711,17 +719,23 @@ def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
711719
# Check DeSerialized version of operator link
712720
assert isinstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleIndexableLink)
713721

714-
job_id = ["123", "45"]
715-
ti.xcom_push(key="job_id", value=job_id)
722+
test_job_id_params = {
723+
"job_id": [TEST_JOB_ID_1, TEST_JOB_ID_2],
724+
"project_id": TEST_GCP_PROJECT_ID,
725+
"location": TEST_DATASET_LOCATION,
726+
}
727+
ti.xcom_push(key="job_id_params", value=test_job_id_params)
716728

717729
assert {"BigQuery Console #1", "BigQuery Console #2"} == simple_task.operator_extra_link_dict.keys()
718730

719-
assert "https://console.cloud.google.com/bigquery?j=123" == simple_task.get_extra_links(
720-
ti, "BigQuery Console #1"
731+
assert (
732+
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
733+
== simple_task.get_extra_links(ti, "BigQuery Console #1")
721734
)
722735

723-
assert "https://console.cloud.google.com/bigquery?j=45" == simple_task.get_extra_links(
724-
ti, "BigQuery Console #2"
736+
assert (
737+
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
738+
== simple_task.get_extra_links(ti, "BigQuery Console #2")
725739
)
726740

727741
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@@ -740,7 +754,9 @@ def test_bigquery_operator_extra_link_when_missing_job_id(
740754

741755
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
742756
def test_bigquery_operator_extra_link_when_single_query(
743-
self, mock_hook, create_task_instance_of_operator
757+
self,
758+
mock_hook,
759+
create_task_instance_of_operator,
744760
):
745761
ti = create_task_instance_of_operator(
746762
BigQueryExecuteQueryOperator,
@@ -751,11 +767,15 @@ def test_bigquery_operator_extra_link_when_single_query(
751767
)
752768
bigquery_task = ti.task
753769

754-
job_id = "12345"
755-
ti.xcom_push(key="job_id", value=job_id)
756-
757-
assert f"https://console.cloud.google.com/bigquery?j={job_id}" == bigquery_task.get_extra_links(
758-
ti, BigQueryConsoleLink.name
770+
test_job_id_params = {
771+
"job_id": TEST_JOB_ID_1,
772+
"project_id": TEST_GCP_PROJECT_ID,
773+
"location": TEST_DATASET_LOCATION,
774+
}
775+
ti.xcom_push(key="job_id_params", value=test_job_id_params)
776+
assert (
777+
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
778+
== bigquery_task.get_extra_links(ti, BigQueryConsoleLink.name)
759779
)
760780

761781
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@@ -771,17 +791,22 @@ def test_bigquery_operator_extra_link_when_multiple_query(
771791
)
772792
bigquery_task = ti.task
773793

774-
job_id = ["123", "45"]
775-
ti.xcom_push(key="job_id", value=job_id)
776-
794+
test_job_id_params = {
795+
"job_id": [TEST_JOB_ID_1, TEST_JOB_ID_2],
796+
"project_id": TEST_GCP_PROJECT_ID,
797+
"location": TEST_DATASET_LOCATION,
798+
}
799+
ti.xcom_push(key="job_id_params", value=test_job_id_params)
777800
assert {"BigQuery Console #1", "BigQuery Console #2"} == bigquery_task.operator_extra_link_dict.keys()
778801

779-
assert "https://console.cloud.google.com/bigquery?j=123" == bigquery_task.get_extra_links(
780-
ti, "BigQuery Console #1"
802+
assert (
803+
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
804+
== bigquery_task.get_extra_links(ti, "BigQuery Console #1")
781805
)
782806

783-
assert "https://console.cloud.google.com/bigquery?j=45" == bigquery_task.get_extra_links(
784-
ti, "BigQuery Console #2"
807+
assert (
808+
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
809+
== bigquery_task.get_extra_links(ti, "BigQuery Console #2")
785810
)
786811

787812

0 commit comments

Comments
 (0)