Skip to content

Commit cb2f339

Browse files
authored
[AIRFLOW-6973] Make GCSCreateBucketOperator idempotent (#7609)
1 parent 09fea3c commit cb2f339

File tree

3 files changed

+14
-10
lines changed

3 files changed

+14
-10
lines changed

airflow/providers/google/cloud/example_dags/example_gcs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@
144144
list_buckets >> delete_bucket_1
145145
upload_file >> delete_bucket_1
146146
create_bucket1 >> upload_file >> delete_bucket_1
147-
transform_file >> delete_bucket_1
147+
upload_file >> transform_file >> delete_bucket_1
148148
gcs_bucket_create_acl_entry_task >> delete_bucket_1
149149
gcs_object_create_acl_entry_task >> delete_bucket_1
150150
download_file >> delete_bucket_1

airflow/providers/google/cloud/operators/gcs.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
from tempfile import NamedTemporaryFile
2525
from typing import Dict, Iterable, List, Optional, Union
2626

27+
from google.api_core.exceptions import Conflict
28+
2729
from airflow.exceptions import AirflowException
2830
from airflow.models import BaseOperator
2931
from airflow.models.xcom import MAX_XCOM_SIZE
@@ -133,13 +135,15 @@ def execute(self, context):
133135
google_cloud_storage_conn_id=self.gcp_conn_id,
134136
delegate_to=self.delegate_to
135137
)
136-
137-
hook.create_bucket(bucket_name=self.bucket_name,
138-
resource=self.resource,
139-
storage_class=self.storage_class,
140-
location=self.location,
141-
project_id=self.project_id,
142-
labels=self.labels)
138+
try:
139+
hook.create_bucket(bucket_name=self.bucket_name,
140+
resource=self.resource,
141+
storage_class=self.storage_class,
142+
location=self.location,
143+
project_id=self.project_id,
144+
labels=self.labels)
145+
except Conflict: # HTTP 409
146+
self.log.warning("Bucket %s already exists", self.bucket_name)
143147

144148

145149
class GCSListObjectsOperator(BaseOperator):

tests/providers/google/cloud/operators/test_postgres_to_gcs_system.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from airflow.providers.postgres.hooks.postgres import PostgresHook
2222
from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY
23-
from tests.test_utils.gcp_system_helpers import GoogleSystemTest, provide_gcp_context
23+
from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
2424

2525
GCS_BUCKET = "postgres_to_gcs_example"
2626
CREATE_QUERY = """
@@ -79,7 +79,7 @@ def setUp(self):
7979

8080
@provide_gcp_context(GCP_GCS_KEY)
8181
def test_run_example_dag(self):
82-
self.run_dag('example_postgres_to_gcs', 'airflow/example_dags')
82+
self.run_dag('example_postgres_to_gcs', CLOUD_DAG_FOLDER)
8383

8484
@provide_gcp_context(GCP_GCS_KEY)
8585
def tearDown(self):

0 commit comments

Comments
 (0)