Skip to content

Commit 5ae82a5

Browse files
authored
Fix Google DLP example and improve ops idempotency (#10608)
1 parent 3867f76 commit 5ae82a5

File tree

3 files changed

+56
-30
lines changed

3 files changed

+56
-30
lines changed

airflow/providers/google/cloud/example_dags/example_dlp.py

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -52,22 +52,25 @@
5252
)
5353
INSPECT_CONFIG = InspectConfig(info_types=[{"name": "PHONE_NUMBER"}, {"name": "US_TOLLFREE_PHONE_NUMBER"}])
5454
INSPECT_TEMPLATE = InspectTemplate(inspect_config=INSPECT_CONFIG)
55+
OUTPUT_BUCKET = os.environ.get("DLP_OUTPUT_BUCKET", "gs://test-dlp-airflow")
56+
OUTPUT_FILENAME = "test.txt"
5557

58+
OBJECT_GCS_URI = os.path.join(OUTPUT_BUCKET, "tmp")
59+
OBJECT_GCS_OUTPUT_URI = os.path.join(OUTPUT_BUCKET, "tmp", OUTPUT_FILENAME)
5660

5761
with models.DAG(
5862
"example_gcp_dlp",
5963
schedule_interval=None, # Override to match your needs
6064
start_date=days_ago(1),
6165
tags=['example'],
62-
) as dag:
66+
) as dag1:
6367
# [START howto_operator_dlp_create_inspect_template]
6468
create_template = CloudDLPCreateInspectTemplateOperator(
6569
project_id=GCP_PROJECT,
6670
inspect_template=INSPECT_TEMPLATE,
6771
template_id=TEMPLATE_ID,
6872
task_id="create_template",
6973
do_xcom_push=True,
70-
dag=dag,
7174
)
7275
# [END howto_operator_dlp_create_inspect_template]
7376

@@ -77,37 +80,42 @@
7780
project_id=GCP_PROJECT,
7881
item=ITEM,
7982
inspect_template_name="{{ task_instance.xcom_pull('create_template', key='return_value')['name'] }}",
80-
dag=dag,
8183
)
8284
# [END howto_operator_dlp_use_inspect_template]
8385

8486
# [START howto_operator_dlp_delete_inspect_template]
8587
delete_template = CloudDLPDeleteInspectTemplateOperator(
86-
task_id="delete_template", template_id=TEMPLATE_ID, project_id=GCP_PROJECT, dag=dag,
88+
task_id="delete_template", template_id=TEMPLATE_ID, project_id=GCP_PROJECT,
8789
)
8890
# [END howto_operator_dlp_delete_inspect_template]
8991

9092
create_template >> inspect_content >> delete_template
9193

92-
93-
CUSTOM_INFO_TYPES = [{"info_type": {"name": "C_MRN"}, "regex": {"pattern": "[1-9]{3}-[1-9]{1}-[1-9]{5}"},}]
9494
CUSTOM_INFO_TYPE_ID = "custom_info_type"
95-
UPDATE_CUSTOM_INFO_TYPE = [
96-
{"info_type": {"name": "C_MRN"}, "regex": {"pattern": "[a-z]{3}-[a-z]{1}-[a-z]{5}"},}
97-
]
95+
CUSTOM_INFO_TYPES = {
96+
"large_custom_dictionary": {
97+
"output_path": {"path": OBJECT_GCS_OUTPUT_URI},
98+
"cloud_storage_file_set": {"url": OBJECT_GCS_URI + "/"},
99+
}
100+
}
101+
UPDATE_CUSTOM_INFO_TYPE = {
102+
"large_custom_dictionary": {
103+
"output_path": {"path": OBJECT_GCS_OUTPUT_URI},
104+
"cloud_storage_file_set": {"url": OBJECT_GCS_URI + "/"},
105+
}
106+
}
98107

99108
with models.DAG(
100109
"example_gcp_dlp_info_types",
101110
schedule_interval=None,
102111
start_date=days_ago(1),
103112
tags=["example", "dlp", "info-types"],
104-
) as dag:
113+
) as dag2:
105114
# [START howto_operator_dlp_create_info_type]
106115
create_info_type = CloudDLPCreateStoredInfoTypeOperator(
107116
project_id=GCP_PROJECT,
108117
config=CUSTOM_INFO_TYPES,
109118
stored_info_type_id=CUSTOM_INFO_TYPE_ID,
110-
dag=dag,
111119
task_id="create_info_type",
112120
)
113121
# [END howto_operator_dlp_create_info_type]
@@ -116,57 +124,53 @@
116124
project_id=GCP_PROJECT,
117125
stored_info_type_id=CUSTOM_INFO_TYPE_ID,
118126
config=UPDATE_CUSTOM_INFO_TYPE,
119-
dag=dag,
120127
task_id="update_info_type",
121128
)
122129
# [END howto_operator_dlp_update_info_type]
123130
# [START howto_operator_dlp_delete_info_type]
124131
delete_info_type = CloudDLPDeleteStoredInfoTypeOperator(
125-
project_id=GCP_PROJECT, stored_info_type_id=CUSTOM_INFO_TYPE_ID, dag=dag, task_id="delete_info_type",
132+
project_id=GCP_PROJECT, stored_info_type_id=CUSTOM_INFO_TYPE_ID, task_id="delete_info_type",
126133
)
127134
# [END howto_operator_dlp_delete_info_type]
128135
create_info_type >> update_info_type >> delete_info_type
129136

130-
SCHEDULE = {"recurrence_period_duration": {"seconds": 60 * 60 * 24}}
131-
JOB = {
132-
"inspect_config": INSPECT_CONFIG,
133-
}
134-
135137
JOB_TRIGGER = {
136-
"inspect_job": JOB,
137-
"triggers": [{"schedule": SCHEDULE}],
138+
"inspect_job": {
139+
"storage_config": {
140+
"datastore_options": {"partition_id": {"project_id": GCP_PROJECT}, "kind": {"name": "test"}}
141+
}
142+
},
143+
"triggers": [{"schedule": {"recurrence_period_duration": {"seconds": 60 * 60 * 24}}}],
138144
"status": "HEALTHY",
139145
}
140146

141147
TRIGGER_ID = "example_trigger"
142148

143149
with models.DAG(
144-
"example_gcp_dlp_job", schedule_interval=None, start_date=days_ago(1), tags=["example", "dlp_job"],
145-
) as dag: # [START howto_operator_dlp_create_job_trigger]
150+
"example_gcp_dlp_job", schedule_interval=None, start_date=days_ago(1), tags=["example", "dlp_job"]
151+
) as dag3: # [START howto_operator_dlp_create_job_trigger]
146152
create_trigger = CloudDLPCreateJobTriggerOperator(
147153
project_id=GCP_PROJECT,
148154
job_trigger=JOB_TRIGGER,
149155
trigger_id=TRIGGER_ID,
150-
dag=dag,
151156
task_id="create_trigger",
152157
)
153158
# [END howto_operator_dlp_create_job_trigger]
154-
UPDATED_SCHEDULE = {"recurrence_period_duration": {"seconds": 2 * 60 * 60 * 24}}
155159

156-
JOB_TRIGGER["triggers"] = [{"schedule": UPDATED_SCHEDULE}]
160+
JOB_TRIGGER["triggers"] = [{"schedule": {"recurrence_period_duration": {"seconds": 2 * 60 * 60 * 24}}}]
157161

158162
# [START howto_operator_dlp_update_job_trigger]
159163
update_trigger = CloudDLPUpdateJobTriggerOperator(
160164
project_id=GCP_PROJECT,
161165
job_trigger_id=TRIGGER_ID,
162166
job_trigger=JOB_TRIGGER,
163-
dag=dag,
164167
task_id="update_info_type",
165168
)
166169
# [END howto_operator_dlp_update_job_trigger]
167170
# [START howto_operator_dlp_delete_job_trigger]
168171
delete_trigger = CloudDLPDeleteJobTriggerOperator(
169-
project_id=GCP_PROJECT, job_trigger_id=TRIGGER_ID, dag=dag, task_id="delete_info_type",
172+
project_id=GCP_PROJECT, job_trigger_id=TRIGGER_ID, task_id="delete_info_type"
170173
)
171174
# [END howto_operator_dlp_delete_job_trigger]
172175
create_trigger >> update_trigger >> delete_trigger
176+

airflow/providers/google/cloud/operators/dlp.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,9 @@ def execute(self, context):
520520
timeout=self.timeout,
521521
metadata=self.metadata,
522522
)
523-
except AlreadyExists:
523+
except InvalidArgument as e:
524+
if "already in use" not in e.message:
525+
raise
524526
trigger = hook.get_job_trigger(
525527
project_id=self.project_id,
526528
job_trigger_id=self.trigger_id,
@@ -621,7 +623,9 @@ def execute(self, context):
621623
timeout=self.timeout,
622624
metadata=self.metadata,
623625
)
624-
except AlreadyExists:
626+
except InvalidArgument as e:
627+
if "already exists" not in e.message:
628+
raise
625629
info = hook.get_stored_info_type(
626630
organization_id=self.organization_id,
627631
project_id=self.project_id,

tests/providers/google/cloud/operators/test_dlp_system.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,29 @@
2525

2626
from tests.providers.google.cloud.utils.gcp_authenticator import GCP_DLP_KEY
2727
from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
28+
from airflow.providers.google.cloud.example_dags.example_dlp import OUTPUT_BUCKET, OUTPUT_FILENAME
29+
30+
31+
@pytest.fixture(scope="class")
32+
def helper():
33+
GoogleSystemTest.create_gcs_bucket(OUTPUT_BUCKET)
34+
GoogleSystemTest.upload_content_to_gcs("aaaa\nbbbb", OUTPUT_BUCKET, f"tmp/{OUTPUT_FILENAME}")
35+
yield
36+
GoogleSystemTest.delete_gcs_bucket(OUTPUT_BUCKET)
2837

2938

3039
@pytest.mark.backend("mysql", "postgres")
40+
@pytest.mark.usefixtures("helper")
3141
@pytest.mark.credential_file(GCP_DLP_KEY)
3242
class GcpDLPExampleDagsSystemTest(GoogleSystemTest):
3343
@provide_gcp_context(GCP_DLP_KEY)
34-
def test_run_example_dag_function(self):
44+
def test_run_example_dag(self):
3545
self.run_dag('example_gcp_dlp', CLOUD_DAG_FOLDER)
46+
47+
@provide_gcp_context(GCP_DLP_KEY)
48+
def test_run_example_info_types(self):
49+
self.run_dag('example_gcp_dlp_info_types', CLOUD_DAG_FOLDER)
50+
51+
@provide_gcp_context(GCP_DLP_KEY)
52+
def test_run_example_dlp_job(self):
53+
self.run_dag('example_gcp_dlp_job', CLOUD_DAG_FOLDER)

0 commit comments

Comments
 (0)