Skip to content

Commit 27dac00

Browse files
authored
[AIRFLOW-7099] Improve system test for cloud transfer service (#7794)
1 parent 0974aab commit 27dac00

File tree

7 files changed

+202
-283
lines changed

7 files changed

+202
-283
lines changed

airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service.py renamed to airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py

Lines changed: 6 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
.. warning::
2929
You need to provide a large enough set of data so that operations do not execute too quickly.
3030
Otherwise, DAG will fail.
31-
* GCP_TRANSFER_FIRST_TARGET_BUCKET - Google Cloud Storage bucket to which files are copied from AWS.
32-
It is also a source bucket in next step
3331
* GCP_TRANSFER_SECOND_TARGET_BUCKET - Google Cloud Storage bucket bucket to which files are copied
3432
* WAIT_FOR_OPERATION_POKE_INTERVAL - interval of what to check the status of the operation
3533
A smaller value than the default value accelerates the system test and ensures its correct execution with
@@ -40,20 +38,19 @@
4038

4139
import os
4240
from datetime import datetime, timedelta
43-
from typing import Any, Dict
4441

4542
from airflow import models
4643
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
4744
ALREADY_EXISTING_IN_SINK, AWS_S3_DATA_SOURCE, BUCKET_NAME, DESCRIPTION, FILTER_JOB_NAMES,
48-
FILTER_PROJECT_ID, GCS_DATA_SINK, GCS_DATA_SOURCE, PROJECT_ID, SCHEDULE, SCHEDULE_END_DATE,
49-
SCHEDULE_START_DATE, START_TIME_OF_DAY, STATUS, TRANSFER_JOB, TRANSFER_JOB_FIELD_MASK, TRANSFER_OPTIONS,
50-
TRANSFER_SPEC, GcpTransferJobsStatus, GcpTransferOperationStatus,
45+
FILTER_PROJECT_ID, GCS_DATA_SINK, PROJECT_ID, SCHEDULE, SCHEDULE_END_DATE, SCHEDULE_START_DATE,
46+
START_TIME_OF_DAY, STATUS, TRANSFER_OPTIONS, TRANSFER_SPEC, GcpTransferJobsStatus,
47+
GcpTransferOperationStatus,
5148
)
5249
from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import (
5350
CloudDataTransferServiceCancelOperationOperator, CloudDataTransferServiceCreateJobOperator,
5451
CloudDataTransferServiceDeleteJobOperator, CloudDataTransferServiceGetOperationOperator,
5552
CloudDataTransferServiceListOperationsOperator, CloudDataTransferServicePauseOperationOperator,
56-
CloudDataTransferServiceResumeOperationOperator, CloudDataTransferServiceUpdateJobOperator,
53+
CloudDataTransferServiceResumeOperationOperator,
5754
)
5855
from airflow.providers.google.cloud.sensors.cloud_storage_transfer_service import (
5956
CloudDataTransferServiceJobStatusSensor,
@@ -69,9 +66,6 @@
6966
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
7067
'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
7168
)
72-
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
73-
'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
74-
)
7569

7670
# [START howto_operator_gcp_transfer_create_job_body_aws]
7771
aws_to_gcs_transfer_body = {
@@ -91,40 +85,13 @@
9185
}
9286
# [END howto_operator_gcp_transfer_create_job_body_aws]
9387

94-
# [START howto_operator_gcp_transfer_create_job_body_gcp]
95-
gcs_to_gcs_transfer_body = {
96-
DESCRIPTION: GCP_DESCRIPTION,
97-
STATUS: GcpTransferJobsStatus.ENABLED,
98-
PROJECT_ID: GCP_PROJECT_ID,
99-
SCHEDULE: {
100-
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
101-
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
102-
START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(),
103-
},
104-
TRANSFER_SPEC: {
105-
GCS_DATA_SOURCE: {BUCKET_NAME: GCP_TRANSFER_FIRST_TARGET_BUCKET},
106-
GCS_DATA_SINK: {BUCKET_NAME: GCP_TRANSFER_SECOND_TARGET_BUCKET},
107-
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
108-
},
109-
} # type: Dict[str, Any]
110-
# [END howto_operator_gcp_transfer_create_job_body_gcp]
111-
112-
# [START howto_operator_gcp_transfer_update_job_body]
113-
update_body = {
114-
PROJECT_ID: GCP_PROJECT_ID,
115-
TRANSFER_JOB: {DESCRIPTION: "{}_updated".format(GCP_DESCRIPTION)},
116-
TRANSFER_JOB_FIELD_MASK: "description",
117-
}
118-
# [END howto_operator_gcp_transfer_update_job_body]
119-
120-
list_filter_dict = {FILTER_PROJECT_ID: GCP_PROJECT_ID, FILTER_JOB_NAMES: []}
12188

12289
# [START howto_operator_gcp_transfer_default_args]
12390
default_args = {'start_date': days_ago(1)}
12491
# [END howto_operator_gcp_transfer_default_args]
12592

12693
with models.DAG(
127-
'example_gcp_transfer',
94+
'example_gcp_transfer_aws',
12895
default_args=default_args,
12996
schedule_interval=None, # Override to match your needs
13097
tags=['example'],
@@ -152,14 +119,6 @@
152119
)
153120
# [END howto_operator_gcp_transfer_pause_operation]
154121

155-
# [START howto_operator_gcp_transfer_update_job]
156-
update_job = CloudDataTransferServiceUpdateJobOperator(
157-
task_id="update_job",
158-
job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
159-
body=update_body,
160-
)
161-
# [END howto_operator_gcp_transfer_update_job]
162-
163122
# [START howto_operator_gcp_transfer_list_operations]
164123
list_operations = CloudDataTransferServiceListOperationsOperator(
165124
task_id="list_operations",
@@ -192,22 +151,6 @@
192151
)
193152
# [END howto_operator_gcp_transfer_wait_operation]
194153

195-
job_time = datetime.utcnow() + timedelta(minutes=2)
196-
197-
gcs_to_gcs_transfer_body['schedule']['startTimeOfDay'] = (datetime.utcnow() + timedelta(minutes=2)).time()
198-
199-
create_transfer_job_from_gcp = CloudDataTransferServiceCreateJobOperator(
200-
task_id="create_transfer_job_from_gcp", body=gcs_to_gcs_transfer_body
201-
)
202-
203-
wait_for_second_operation_to_start = CloudDataTransferServiceJobStatusSensor(
204-
task_id="wait_for_second_operation_to_start",
205-
job_name="{{ task_instance.xcom_pull('create_transfer_job_from_gcp')['name'] }}",
206-
project_id=GCP_PROJECT_ID,
207-
expected_statuses={GcpTransferOperationStatus.IN_PROGRESS},
208-
poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
209-
)
210-
211154
# [START howto_operator_gcp_transfer_cancel_operation]
212155
cancel_operation = CloudDataTransferServiceCancelOperationOperator(
213156
task_id="cancel_operation",
@@ -224,13 +167,6 @@
224167
)
225168
# [END howto_operator_gcp_transfer_delete_job]
226169

227-
delete_transfer_from_gcp_job = CloudDataTransferServiceDeleteJobOperator(
228-
task_id="delete_transfer_from_gcp_job",
229-
job_name="{{task_instance.xcom_pull('create_transfer_job_from_gcp')['name']}}",
230-
project_id=GCP_PROJECT_ID,
231-
)
232-
233170
create_transfer_job_from_aws >> wait_for_operation_to_start >> pause_operation >> \
234171
list_operations >> get_operation >> resume_operation >> wait_for_operation_to_end >> \
235-
create_transfer_job_from_gcp >> wait_for_second_operation_to_start >> cancel_operation >> \
236-
delete_transfer_from_aws_job >> delete_transfer_from_gcp_job
172+
cancel_operation >> delete_transfer_from_aws_job
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
"""
20+
Example Airflow DAG that demonstrates interactions with Google Cloud Transfer.
21+
22+
23+
This DAG relies on the following OS environment variables
24+
25+
* GCP_PROJECT_ID - Google Cloud Project to use for the Google Cloud Transfer Service.
26+
* GCP_TRANSFER_FIRST_TARGET_BUCKET - Google Cloud Storage bucket to which files are copied from AWS.
27+
It is also a source bucket in next step
28+
* GCP_TRANSFER_SECOND_TARGET_BUCKET - Google Cloud Storage bucket bucket to which files are copied
29+
"""
30+
31+
import os
32+
from datetime import datetime, timedelta
33+
34+
from airflow import models
35+
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
36+
ALREADY_EXISTING_IN_SINK, BUCKET_NAME, DESCRIPTION, FILTER_JOB_NAMES, FILTER_PROJECT_ID, GCS_DATA_SINK,
37+
GCS_DATA_SOURCE, PROJECT_ID, SCHEDULE, SCHEDULE_END_DATE, SCHEDULE_START_DATE, START_TIME_OF_DAY, STATUS,
38+
TRANSFER_JOB, TRANSFER_JOB_FIELD_MASK, TRANSFER_OPTIONS, TRANSFER_SPEC, GcpTransferJobsStatus,
39+
GcpTransferOperationStatus,
40+
)
41+
from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import (
42+
CloudDataTransferServiceCreateJobOperator, CloudDataTransferServiceDeleteJobOperator,
43+
CloudDataTransferServiceGetOperationOperator, CloudDataTransferServiceListOperationsOperator,
44+
CloudDataTransferServiceUpdateJobOperator,
45+
)
46+
from airflow.providers.google.cloud.sensors.cloud_storage_transfer_service import (
47+
CloudDataTransferServiceJobStatusSensor,
48+
)
49+
from airflow.utils.dates import days_ago
50+
51+
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
52+
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
53+
"GCP_TRANSFER_FIRST_TARGET_BUCKET", "gcp-transfer-first-target"
54+
)
55+
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
56+
"GCP_TRANSFER_SECOND_TARGET_BUCKET", "gcp-transfer-second-target"
57+
)
58+
59+
# [START howto_operator_gcp_transfer_create_job_body_gcp]
60+
gcs_to_gcs_transfer_body = {
61+
DESCRIPTION: "description",
62+
STATUS: GcpTransferJobsStatus.ENABLED,
63+
PROJECT_ID: GCP_PROJECT_ID,
64+
SCHEDULE: {
65+
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
66+
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
67+
START_TIME_OF_DAY: (datetime.utcnow() + timedelta(seconds=120)).time(),
68+
},
69+
TRANSFER_SPEC: {
70+
GCS_DATA_SOURCE: {BUCKET_NAME: GCP_TRANSFER_FIRST_TARGET_BUCKET},
71+
GCS_DATA_SINK: {BUCKET_NAME: GCP_TRANSFER_SECOND_TARGET_BUCKET},
72+
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
73+
},
74+
}
75+
# [END howto_operator_gcp_transfer_create_job_body_gcp]
76+
77+
# [START howto_operator_gcp_transfer_update_job_body]
78+
update_body = {
79+
PROJECT_ID: GCP_PROJECT_ID,
80+
TRANSFER_JOB: {DESCRIPTION: "description_updated"},
81+
TRANSFER_JOB_FIELD_MASK: "description",
82+
}
83+
# [END howto_operator_gcp_transfer_update_job_body]
84+
85+
default_args = {"start_date": days_ago(1)}
86+
87+
with models.DAG(
88+
"example_gcp_transfer",
89+
default_args=default_args,
90+
schedule_interval=None, # Override to match your needs
91+
tags=["example"],
92+
) as dag:
93+
94+
create_transfer = CloudDataTransferServiceCreateJobOperator(
95+
task_id="create_transfer", body=gcs_to_gcs_transfer_body
96+
)
97+
98+
# [START howto_operator_gcp_transfer_update_job]
99+
update_transfer = CloudDataTransferServiceUpdateJobOperator(
100+
task_id="update_transfer",
101+
job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
102+
body=update_body,
103+
)
104+
# [END howto_operator_gcp_transfer_update_job]
105+
106+
wait_for_transfer = CloudDataTransferServiceJobStatusSensor(
107+
task_id="wait_for_transfer",
108+
job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
109+
project_id=GCP_PROJECT_ID,
110+
expected_statuses={GcpTransferOperationStatus.SUCCESS},
111+
)
112+
113+
list_operations = CloudDataTransferServiceListOperationsOperator(
114+
task_id="list_operations",
115+
request_filter={
116+
FILTER_PROJECT_ID: GCP_PROJECT_ID,
117+
FILTER_JOB_NAMES: [
118+
"{{task_instance.xcom_pull('create_transfer')['name']}}"
119+
],
120+
},
121+
)
122+
123+
get_operation = CloudDataTransferServiceGetOperationOperator(
124+
task_id="get_operation",
125+
operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}",
126+
)
127+
128+
delete_transfer = CloudDataTransferServiceDeleteJobOperator(
129+
task_id="delete_transfer_from_gcp_job",
130+
job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
131+
project_id=GCP_PROJECT_ID,
132+
)
133+
134+
create_transfer >> wait_for_transfer >> update_transfer >> \
135+
list_operations >> get_operation >> delete_transfer

docs/howto/operator/gcp/cloud_storage_transfer_service.rst

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,17 @@ For parameter definition, take a look at
6262
Using the operator
6363
""""""""""""""""""
6464

65-
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service.py
65+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py
6666
:language: python
6767
:start-after: [START howto_operator_gcp_transfer_create_job_body_gcp]
6868
:end-before: [END howto_operator_gcp_transfer_create_job_body_gcp]
6969

70-
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service.py
70+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
7171
:language: python
7272
:start-after: [START howto_operator_gcp_transfer_create_job_body_aws]
7373
:end-before: [END howto_operator_gcp_transfer_create_job_body_aws]
7474

75-
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service.py
75+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
7676
:language: python
7777
:dedent: 4
7878
:start-after: [START howto_operator_gcp_transfer_create_job]
@@ -107,7 +107,7 @@ For parameter definition, take a look at
107107
Using the operator
108108
""""""""""""""""""
109109

110-
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service.py
110+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
111111
:language: python
112112
:dedent: 4
113113
:start-after: [START howto_operator_gcp_transfer_delete_job]
@@ -142,12 +142,12 @@ For parameter definition, take a look at
142142
Using the operator
143143
""""""""""""""""""
144144

145-
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service.py
145+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py
146146
:language: python
147147
:start-after: [START howto_operator_gcp_transfer_update_job_body]
148148
:end-before: [END howto_operator_gcp_transfer_update_job_body]
149149

150-
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service.py
150+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py
151151
:language: python
152152
:dedent: 4
153153
:start-after: [START howto_operator_gcp_transfer_update_job]
@@ -181,7 +181,7 @@ For parameter definition, take a look at
181181
Using the operator
182182
""""""""""""""""""
183183

184-
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service.py
184+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
185185
:language: python
186186
:dedent: 4
187187
:start-after: [START howto_operator_gcp_transfer_cancel_operation]
@@ -217,7 +217,7 @@ For parameter definition, take a look at
217217
Using the operator
218218
""""""""""""""""""
219219

220-
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service.py
220+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
221221
:language: python
222222
:dedent: 4
223223
:start-after: [START howto_operator_gcp_transfer_get_operation]
@@ -252,7 +252,7 @@ For parameter definition, take a look at
252252
Using the operator
253253
""""""""""""""""""
254254

255-
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service.py
255+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
256256
:language: python
257257
:dedent: 4
258258
:start-after: [START howto_operator_gcp_transfer_list_operations]
@@ -286,7 +286,7 @@ For parameter definition, take a look at
286286
Using the operator
287287
""""""""""""""""""
288288

289-
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service.py
289+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
290290
:language: python
291291
:dedent: 4
292292
:start-after: [START howto_operator_gcp_transfer_pause_operation]
@@ -320,7 +320,7 @@ For parameter definition, take a look at
320320
Using the operator
321321
""""""""""""""""""
322322

323-
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service.py
323+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
324324
:language: python
325325
:dedent: 4
326326
:start-after: [START howto_operator_gcp_transfer_resume_operation]
@@ -355,7 +355,7 @@ For parameter definition, take a look at
355355
Using the operator
356356
""""""""""""""""""
357357

358-
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service.py
358+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
359359
:language: python
360360
:dedent: 4
361361
:start-after: [START howto_operator_gcp_transfer_wait_operation]

0 commit comments

Comments
 (0)