|
24 | 24 | from airflow import models
|
25 | 25 | from airflow.providers.google.ads.operators.ads import GoogleAdsListAccountsOperator
|
26 | 26 | from airflow.providers.google.ads.transfers.ads_to_gcs import GoogleAdsToGcsOperator
|
| 27 | +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator |
| 28 | +from airflow.utils.trigger_rule import TriggerRule |
27 | 29 |
|
28 | 30 | # [START howto_google_ads_env_variables]
|
| 31 | +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") |
| 32 | +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") |
| 33 | + |
| 34 | +DAG_ID = "example_google_ads" |
| 35 | + |
| 36 | +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" |
29 | 37 | CLIENT_IDS = ["1111111111", "2222222222"]
|
30 |
| -BUCKET = os.environ.get("GOOGLE_ADS_BUCKET", "gs://INVALID BUCKET NAME") |
31 | 38 | GCS_OBJ_PATH = "folder_name/google-ads-api-results.csv"
|
32 | 39 | GCS_ACCOUNTS_CSV = "folder_name/accounts.csv"
|
33 | 40 | QUERY = """
|
|
61 | 68 | "metrics.all_conversions.value",
|
62 | 69 | "metrics.cost_micros.value",
|
63 | 70 | ]
|
64 |
| - |
65 | 71 | # [END howto_google_ads_env_variables]
|
66 | 72 |
|
67 | 73 | with models.DAG(
|
68 |
| - "example_google_ads", |
69 |
| - schedule_interval=None, # Override to match your needs |
| 74 | + DAG_ID, |
| 75 | + schedule_interval='@once', |
70 | 76 | start_date=datetime(2021, 1, 1),
|
71 | 77 | catchup=False,
|
| 78 | + tags=["example", "ads"], |
72 | 79 | ) as dag:
|
| 80 | + create_bucket = GCSCreateBucketOperator( |
| 81 | + task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID |
| 82 | + ) |
| 83 | + |
73 | 84 | # [START howto_google_ads_to_gcs_operator]
|
74 | 85 | run_operator = GoogleAdsToGcsOperator(
|
75 | 86 | client_ids=CLIENT_IDS,
|
76 | 87 | query=QUERY,
|
77 | 88 | attributes=FIELDS_TO_EXTRACT,
|
78 | 89 | obj=GCS_OBJ_PATH,
|
79 |
| - bucket=BUCKET, |
| 90 | + bucket=BUCKET_NAME, |
80 | 91 | task_id="run_operator",
|
81 | 92 | )
|
82 | 93 | # [END howto_google_ads_to_gcs_operator]
|
83 | 94 |
|
84 | 95 | # [START howto_ads_list_accounts_operator]
|
85 | 96 | list_accounts = GoogleAdsListAccountsOperator(
|
86 |
| - task_id="list_accounts", bucket=BUCKET, object_name=GCS_ACCOUNTS_CSV |
| 97 | + task_id="list_accounts", bucket=BUCKET_NAME, object_name=GCS_ACCOUNTS_CSV |
87 | 98 | )
|
88 | 99 | # [END howto_ads_list_accounts_operator]
|
| 100 | + |
| 101 | + delete_bucket = GCSDeleteBucketOperator( |
| 102 | + task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE |
| 103 | + ) |
| 104 | + |
| 105 | + ( |
| 106 | + # TEST SETUP |
| 107 | + create_bucket |
| 108 | + # TEST BODY |
| 109 | + >> run_operator |
| 110 | + >> list_accounts |
| 111 | + # TEST TEARDOWN |
| 112 | + >> delete_bucket |
| 113 | + ) |
| 114 | + |
| 115 | + from tests.system.utils.watcher import watcher |
| 116 | + |
| 117 | + # This test needs watcher in order to properly mark success/failure |
| 118 | + # when "tearDown" task with trigger rule is part of the DAG |
| 119 | + list(dag.tasks) >> watcher() |
| 120 | + |
| 121 | + |
| 122 | +from tests.system.utils import get_test_run # noqa: E402 |
| 123 | + |
| 124 | +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) |
| 125 | +test_run = get_test_run(dag) |
0 commit comments