Skip to content

Commit 42841f7

Browse files
authored
Migration of System Tests: Cloud BigQuery Data Transfer (AIP-47) (#27312)
1 parent 27a92fe commit 42841f7

File tree

3 files changed

+105
-110
lines changed

3 files changed

+105
-110
lines changed

docs/apache-airflow-providers-google/operators/cloud/bigquery_dts.rst

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,15 @@ for example :class:`~airflow.providers.google.cloud.operators.bigquery_dts.BigQu
4747
scheduling option is present in passed configuration. If present then nothing is done, otherwise it's value is
4848
set to ``True``.
4949

50-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_dts.py
50+
.. exampleinclude:: /../../tests/system/providers/google/cloud/bigquery/example_bigquery_dts.py
5151
:language: python
5252
:start-after: [START howto_bigquery_dts_create_args]
5353
:end-before: [END howto_bigquery_dts_create_args]
5454

5555
You can create the operator with or without project id. If project id is missing
5656
it will be retrieved from the Google Cloud connection used. Basic usage of the operator:
5757

58-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_dts.py
58+
.. exampleinclude:: /../../tests/system/providers/google/cloud/bigquery/example_bigquery_dts.py
5959
:language: python
6060
:dedent: 4
6161
:start-after: [START howto_bigquery_create_data_transfer]
@@ -78,7 +78,7 @@ To delete DTS transfer configuration you can use
7878

7979
Basic usage of the operator:
8080

81-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_dts.py
81+
.. exampleinclude:: /../../tests/system/providers/google/cloud/bigquery/example_bigquery_dts.py
8282
:language: python
8383
:dedent: 4
8484
:start-after: [START howto_bigquery_delete_data_transfer]
@@ -99,7 +99,7 @@ Start manual transfer runs to be executed now with schedule_time equal to curren
9999

100100
Basic usage of the operator:
101101

102-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_dts.py
102+
.. exampleinclude:: /../../tests/system/providers/google/cloud/bigquery/example_bigquery_dts.py
103103
:language: python
104104
:dedent: 4
105105
:start-after: [START howto_bigquery_start_transfer]
@@ -112,7 +112,7 @@ parameters which allows you to dynamically determine values.
112112
To check if operation succeeded you can use
113113
:class:`~airflow.providers.google.cloud.sensors.bigquery_dts.BigQueryDataTransferServiceTransferRunSensor`.
114114

115-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_dts.py
115+
.. exampleinclude:: /../../tests/system/providers/google/cloud/bigquery/example_bigquery_dts.py
116116
:language: python
117117
:dedent: 4
118118
:start-after: [START howto_bigquery_dts_sensor]

tests/providers/google/cloud/operators/test_bigquery_dts_system.py

Lines changed: 0 additions & 82 deletions
This file was deleted.

airflow/providers/google/cloud/example_dags/example_bigquery_dts.py renamed to tests/system/providers/google/cloud/bigquery/example_bigquery_dts.py

Lines changed: 100 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,58 +23,97 @@
2323
import os
2424
import time
2525
from datetime import datetime
26+
from pathlib import Path
2627
from typing import cast
2728

2829
from airflow import models
30+
from airflow.models.baseoperator import chain
2931
from airflow.models.xcom_arg import XComArg
32+
from airflow.providers.google.cloud.operators.bigquery import (
33+
BigQueryCreateEmptyDatasetOperator,
34+
BigQueryCreateEmptyTableOperator,
35+
BigQueryDeleteDatasetOperator,
36+
)
3037
from airflow.providers.google.cloud.operators.bigquery_dts import (
3138
BigQueryCreateDataTransferOperator,
3239
BigQueryDataTransferServiceStartTransferRunsOperator,
3340
BigQueryDeleteDataTransferConfigOperator,
3441
)
42+
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
3543
from airflow.providers.google.cloud.sensors.bigquery_dts import BigQueryDataTransferServiceTransferRunSensor
44+
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
45+
from airflow.utils.trigger_rule import TriggerRule
46+
47+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
48+
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
49+
50+
DAG_ID = "example_gcp_bigquery_dts"
3651

37-
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
38-
BUCKET_URI = os.environ.get("GCP_DTS_BUCKET_URI", "gs://INVALID BUCKET NAME/bank-marketing.csv")
39-
GCP_DTS_BQ_DATASET = os.environ.get("GCP_DTS_BQ_DATASET", "test_dts")
40-
GCP_DTS_BQ_TABLE = os.environ.get("GCP_DTS_BQ_TABLE", "GCS_Test")
52+
BUCKET_NAME = f"bucket-{DAG_ID}-{ENV_ID}"
53+
54+
FILE_NAME = "us-states.csv"
55+
CURRENT_FOLDER = Path(__file__).parent
56+
FILE_LOCAL_PATH = str(Path(CURRENT_FOLDER) / "resources" / FILE_NAME)
57+
BUCKET_URI = f"gs://{BUCKET_NAME}/{FILE_NAME}"
58+
59+
DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
60+
DTS_BQ_TABLE = "DTS_BQ_TABLE"
4161

4262
# [START howto_bigquery_dts_create_args]
4363

4464
# In the case of Airflow, the customer needs to create a transfer
4565
# config with the automatic scheduling disabled and then trigger
4666
# a transfer run using a specialized Airflow operator
47-
schedule_options = {"disable_auto_scheduling": True}
48-
49-
PARAMS = {
50-
"field_delimiter": ",",
51-
"max_bad_records": "0",
52-
"skip_leading_rows": "1",
53-
"data_path_template": BUCKET_URI,
54-
"destination_table_name_template": GCP_DTS_BQ_TABLE,
55-
"file_format": "CSV",
56-
}
57-
5867
TRANSFER_CONFIG = {
59-
"destination_dataset_id": GCP_DTS_BQ_DATASET,
60-
"display_name": "GCS Test Config",
68+
"destination_dataset_id": DATASET_NAME,
69+
"display_name": "test data transfer",
6170
"data_source_id": "google_cloud_storage",
62-
"schedule_options": schedule_options,
63-
"params": PARAMS,
71+
"schedule_options": {"disable_auto_scheduling": True},
72+
"params": {
73+
"field_delimiter": ",",
74+
"max_bad_records": "0",
75+
"skip_leading_rows": "1",
76+
"data_path_template": BUCKET_URI,
77+
"destination_table_name_template": DTS_BQ_TABLE,
78+
"file_format": "CSV",
79+
},
6480
}
6581

6682
# [END howto_bigquery_dts_create_args]
6783

6884
with models.DAG(
69-
"example_gcp_bigquery_dts",
85+
DAG_ID,
86+
schedule="@once",
7087
start_date=datetime(2021, 1, 1),
7188
catchup=False,
72-
tags=["example"],
89+
tags=["example", "bigquery"],
7390
) as dag:
91+
92+
create_bucket = GCSCreateBucketOperator(
93+
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
94+
)
95+
upload_file = LocalFilesystemToGCSOperator(
96+
task_id="upload_file",
97+
src=FILE_LOCAL_PATH,
98+
dst=FILE_NAME,
99+
bucket=BUCKET_NAME,
100+
)
101+
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)
102+
103+
create_table = BigQueryCreateEmptyTableOperator(
104+
task_id="create_table",
105+
dataset_id=DATASET_NAME,
106+
table_id=DTS_BQ_TABLE,
107+
schema_fields=[
108+
{"name": "name", "type": "STRING", "mode": "REQUIRED"},
109+
{"name": "post_abbr", "type": "STRING", "mode": "NULLABLE"},
110+
],
111+
)
112+
74113
# [START howto_bigquery_create_data_transfer]
75114
gcp_bigquery_create_transfer = BigQueryCreateDataTransferOperator(
76115
transfer_config=TRANSFER_CONFIG,
77-
project_id=GCP_PROJECT_ID,
116+
project_id=PROJECT_ID,
78117
task_id="gcp_bigquery_create_transfer",
79118
)
80119

@@ -103,11 +142,49 @@
103142
transfer_config_id=transfer_config_id, task_id="gcp_bigquery_delete_transfer"
104143
)
105144
# [END howto_bigquery_delete_data_transfer]
145+
gcp_bigquery_delete_transfer.trigger_rule = TriggerRule.ALL_DONE
146+
147+
delete_dataset = BigQueryDeleteDatasetOperator(
148+
task_id="delete_dataset",
149+
dataset_id=DATASET_NAME,
150+
delete_contents=True,
151+
trigger_rule=TriggerRule.ALL_DONE,
152+
)
106153

107-
gcp_run_sensor >> gcp_bigquery_delete_transfer
154+
delete_bucket = GCSDeleteBucketOperator(
155+
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
156+
)
108157

109158
# Task dependencies created via `XComArgs`:
110159
# gcp_bigquery_create_transfer >> gcp_bigquery_start_transfer
111160
# gcp_bigquery_create_transfer >> gcp_run_sensor
112161
# gcp_bigquery_start_transfer >> gcp_run_sensor
113162
# gcp_bigquery_create_transfer >> gcp_bigquery_delete_transfer
163+
164+
chain(
165+
# TEST SETUP
166+
create_bucket,
167+
upload_file,
168+
create_dataset,
169+
create_table,
170+
# TEST BODY
171+
gcp_bigquery_create_transfer,
172+
gcp_bigquery_start_transfer,
173+
gcp_run_sensor,
174+
gcp_bigquery_delete_transfer,
175+
# TEST TEARDOWN
176+
delete_dataset,
177+
delete_bucket,
178+
)
179+
180+
from tests.system.utils.watcher import watcher
181+
182+
# This test needs watcher in order to properly mark success/failure
183+
# when "tearDown" task with trigger rule is part of the DAG
184+
list(dag.tasks) >> watcher()
185+
186+
187+
from tests.system.utils import get_test_run # noqa: E402
188+
189+
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
190+
test_run = get_test_run(dag)

0 commit comments

Comments
 (0)