|
20 | 20 | from datetime import datetime
|
21 | 21 |
|
22 | 22 | from airflow import models
|
| 23 | +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator |
23 | 24 | from airflow.providers.google.cloud.transfers.calendar_to_gcs import GoogleCalendarToGCSOperator
|
| 25 | +from airflow.utils.trigger_rule import TriggerRule |
24 | 26 |
|
25 |
| -BUCKET = os.environ.get("GCP_GCS_BUCKET", "test28397yeo") |
| 27 | +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") |
| 28 | +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") |
| 29 | +DAG_ID = "example_calendar_to_gcs" |
| 30 | + |
| 31 | +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" |
26 | 32 | CALENDAR_ID = os.environ.get("CALENDAR_ID", "1234567890qwerty")
|
27 | 33 | API_VERSION = "v3"
|
28 | 34 |
|
29 | 35 | with models.DAG(
|
30 |
| - "example_calendar_to_gcs", |
| 36 | + DAG_ID, |
31 | 37 | schedule_interval='@once', # Override to match your needs
|
32 |
| - start_date=datetime(2022, 1, 1), |
| 38 | + start_date=datetime(2021, 1, 1), |
33 | 39 | catchup=False,
|
34 |
| - tags=["example"], |
| 40 | + tags=["example", "calendar"], |
35 | 41 | ) as dag:
|
| 42 | + create_bucket = GCSCreateBucketOperator( |
| 43 | + task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID |
| 44 | + ) |
| 45 | + |
36 | 46 | # [START upload_calendar_to_gcs]
|
37 | 47 | upload_calendar_to_gcs = GoogleCalendarToGCSOperator(
|
38 | 48 | task_id="upload_calendar_to_gcs",
|
39 |
| - destination_bucket=BUCKET, |
| 49 | + destination_bucket=BUCKET_NAME, |
40 | 50 | calendar_id=CALENDAR_ID,
|
41 | 51 | api_version=API_VERSION,
|
42 | 52 | )
|
43 | 53 | # [END upload_calendar_to_gcs]
|
| 54 | + |
| 55 | + delete_bucket = GCSDeleteBucketOperator( |
| 56 | + task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE |
| 57 | + ) |
| 58 | + |
| 59 | + ( |
| 60 | + # TEST SETUP |
| 61 | + create_bucket |
| 62 | + # TEST BODY |
| 63 | + >> upload_calendar_to_gcs |
| 64 | + # TEST TEARDOWN |
| 65 | + >> delete_bucket |
| 66 | + ) |
| 67 | + |
| 68 | + from tests.system.utils.watcher import watcher |
| 69 | + |
| 70 | + # This test needs watcher in order to properly mark success/failure |
| 71 | + # when "tearDown" task with trigger rule is part of the DAG |
| 72 | + list(dag.tasks) >> watcher() |
| 73 | + |
| 74 | +from tests.system.utils import get_test_run # noqa: E402 |
| 75 | + |
| 76 | +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) |
| 77 | +test_run = get_test_run(dag) |
0 commit comments