|
20 | 20 | from datetime import datetime
|
21 | 21 |
|
22 | 22 | from airflow import models
|
| 23 | +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator |
23 | 24 | from airflow.providers.google.cloud.transfers.sheets_to_gcs import GoogleSheetsToGCSOperator
|
| 25 | +from airflow.utils.trigger_rule import TriggerRule |
24 | 26 |
|
25 |
| -BUCKET = os.environ.get("GCP_GCS_BUCKET", "test28397yeo") |
| 27 | +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") |
| 28 | +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") |
| 29 | +DAG_ID = "example_sheets_to_gcs" |
| 30 | + |
| 31 | +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" |
26 | 32 | SPREADSHEET_ID = os.environ.get("SPREADSHEET_ID", "1234567890qwerty")
|
27 | 33 |
|
28 | 34 | with models.DAG(
|
29 |
| - "example_sheets_to_gcs", |
| 35 | + DAG_ID, |
30 | 36 | schedule_interval='@once', # Override to match your needs
|
31 | 37 | start_date=datetime(2021, 1, 1),
|
32 | 38 | catchup=False,
|
33 |
| - tags=["example"], |
| 39 | + tags=["example", "sheets"], |
34 | 40 | ) as dag:
|
| 41 | + create_bucket = GCSCreateBucketOperator( |
| 42 | + task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID |
| 43 | + ) |
| 44 | + |
35 | 45 | # [START upload_sheet_to_gcs]
|
36 | 46 | upload_sheet_to_gcs = GoogleSheetsToGCSOperator(
|
37 | 47 | task_id="upload_sheet_to_gcs",
|
38 |
| - destination_bucket=BUCKET, |
| 48 | + destination_bucket=BUCKET_NAME, |
39 | 49 | spreadsheet_id=SPREADSHEET_ID,
|
40 | 50 | )
|
41 | 51 | # [END upload_sheet_to_gcs]
|
| 52 | + |
| 53 | + delete_bucket = GCSDeleteBucketOperator( |
| 54 | + task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE |
| 55 | + ) |
| 56 | + |
| 57 | + ( |
| 58 | + # TEST SETUP |
| 59 | + create_bucket |
| 60 | + # TEST BODY |
| 61 | + >> upload_sheet_to_gcs |
| 62 | + # TEST TEARDOWN |
| 63 | + >> delete_bucket |
| 64 | + ) |
| 65 | + |
| 66 | + from tests.system.utils.watcher import watcher |
| 67 | + |
| 68 | + # This test needs watcher in order to properly mark success/failure |
| 69 | + # when "tearDown" task with trigger rule is part of the DAG |
| 70 | + list(dag.tasks) >> watcher() |
| 71 | + |
| 72 | +from tests.system.utils import get_test_run # noqa: E402 |
| 73 | + |
| 74 | +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) |
| 75 | +test_run = get_test_run(dag) |
0 commit comments