30
30
BigQueryInsertJobOperator ,
31
31
)
32
32
from airflow .providers .google .cloud .transfers .trino_to_gcs import TrinoToGCSOperator
33
+ from airflow .utils .trigger_rule import TriggerRule
34
+
35
+ ENV_ID = os .environ .get ("SYSTEM_TESTS_ENV_ID" )
36
+ DAG_ID = "example_trino_to_gcs"
33
37
34
38
GCP_PROJECT_ID = os .environ .get ("GCP_PROJECT_ID" , 'example-project' )
35
- GCS_BUCKET = os . environ . get ( "GCP_TRINO_TO_GCS_BUCKET_NAME" , "INVALID BUCKET NAME" )
36
- DATASET_NAME = os . environ . get ( "GCP_TRINO_TO_GCS_DATASET_NAME" , "test_trino_to_gcs_dataset" )
39
+ GCS_BUCKET = f"bucket_ { DAG_ID } _ { ENV_ID } "
40
+ DATASET_NAME = f"dataset_ { DAG_ID } _ { ENV_ID } "
37
41
38
42
SOURCE_MULTIPLE_TYPES = "memory.default.test_multiple_types"
39
43
SOURCE_CUSTOMER_TABLE = "tpch.sf1.customer"
@@ -47,17 +51,19 @@ def safe_name(s: str) -> str:
47
51
48
52
49
53
with models .DAG (
50
- dag_id = "example_trino_to_gcs" ,
54
+ dag_id = DAG_ID ,
51
55
schedule_interval = '@once' , # Override to match your needs
52
56
start_date = datetime (2021 , 1 , 1 ),
53
57
catchup = False ,
54
- tags = ["example" ],
58
+ tags = ["example" , "gcs" ],
55
59
) as dag :
56
-
57
60
create_dataset = BigQueryCreateEmptyDatasetOperator (task_id = "create-dataset" , dataset_id = DATASET_NAME )
58
61
59
62
delete_dataset = BigQueryDeleteDatasetOperator (
60
- task_id = "delete_dataset" , dataset_id = DATASET_NAME , delete_contents = True
63
+ task_id = "delete_dataset" ,
64
+ dataset_id = DATASET_NAME ,
65
+ delete_contents = True ,
66
+ trigger_rule = TriggerRule .ALL_DONE ,
61
67
)
62
68
63
69
# [START howto_operator_trino_to_gcs_basic]
@@ -179,15 +185,29 @@ def safe_name(s: str) -> str:
179
185
)
180
186
# [END howto_operator_trino_to_gcs_csv]
181
187
182
- create_dataset >> trino_to_gcs_basic
183
- create_dataset >> trino_to_gcs_multiple_types
184
- create_dataset >> trino_to_gcs_many_chunks
185
- create_dataset >> trino_to_gcs_csv
188
+ (
189
+ # TEST SETUP
190
+ create_dataset
191
+ # TEST BODY
192
+ >> trino_to_gcs_basic
193
+ >> trino_to_gcs_multiple_types
194
+ >> trino_to_gcs_many_chunks
195
+ >> trino_to_gcs_csv
196
+ >> create_external_table_multiple_types
197
+ >> create_external_table_many_chunks
198
+ >> read_data_from_gcs_multiple_types
199
+ >> read_data_from_gcs_many_chunks
200
+ # TEST TEARDOWN
201
+ >> delete_dataset
202
+ )
203
+
204
+ from tests .system .utils .watcher import watcher
205
+
206
+ # This test needs watcher in order to properly mark success/failure
207
+ # when "tearDown" task with trigger rule is part of the DAG
208
+ list (dag .tasks ) >> watcher ()
186
209
187
- trino_to_gcs_multiple_types >> create_external_table_multiple_types >> read_data_from_gcs_multiple_types
188
- trino_to_gcs_many_chunks >> create_external_table_many_chunks >> read_data_from_gcs_many_chunks
210
+ from tests .system .utils import get_test_run # noqa: E402
189
211
190
- trino_to_gcs_basic >> delete_dataset
191
- trino_to_gcs_csv >> delete_dataset
192
- read_data_from_gcs_multiple_types >> delete_dataset
193
- read_data_from_gcs_many_chunks >> delete_dataset
212
+ # Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
213
+ test_run = get_test_run (dag )
0 commit comments