Skip to content

Commit e4757d6

Browse files
authored
Fix GCSToGCSOperator copy without wildcard and exact_match=True (#32376)
1 parent 2b841f7 commit e4757d6

File tree

2 files changed

+46
-1
lines changed

2 files changed

+46
-1
lines changed

airflow/providers/google/cloud/transfers/gcs_to_gcs.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,8 @@ def _copy_source_without_wildcard(self, hook, prefix):
370370
self.source_bucket, prefix=prefix, delimiter=self.delimiter, match_glob=self.match_glob
371371
)
372372

373+
objects = [obj for obj in objects if self._check_exact_match(obj, prefix)]
374+
373375
if not self.replace:
374376
# If we are not replacing, ignore files already existing in source buckets
375377
objects = self._ignore_existing_files(
@@ -405,7 +407,7 @@ def _copy_file(self, hook, source_object):
405407
def _copy_directory(self, hook, source_objects, prefix):
406408
_prefix = prefix.rstrip("/") + "/"
407409
for source_obj in source_objects:
408-
if self.exact_match and (source_obj != prefix or not source_obj.endswith(prefix)):
410+
if not self._check_exact_match(source_obj, prefix):
409411
continue
410412
if self.destination_object is None:
411413
destination_object = source_obj
@@ -417,6 +419,12 @@ def _copy_directory(self, hook, source_objects, prefix):
417419
hook=hook, source_object=source_obj, destination_object=destination_object
418420
)
419421

422+
def _check_exact_match(self, source_object: str, prefix: str) -> bool:
423+
"""Checks whether source_object's name matches the prefix according to the exact_match flag."""
424+
if self.exact_match and (source_object != prefix or not source_object.endswith(prefix)):
425+
return False
426+
return True
427+
420428
def _copy_source_with_wildcard(self, hook, prefix):
421429
total_wildcards = prefix.count(WILDCARD)
422430
if total_wildcards > 1:

tests/providers/google/cloud/transfers/test_gcs_to_gcs.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ def test_execute_no_wildcard_with_replace_flag_false(self, mock_hook):
128128
def test_copy_file_with_exact_match(self, mock_hook):
129129
SOURCE_FILES = [
130130
"test_object.txt",
131+
"test_object.txt.abc",
131132
"test_object.txt.copy/",
132133
"test_object.txt.folder/",
133134
]
@@ -145,6 +146,42 @@ def test_copy_file_with_exact_match(self, mock_hook):
145146
mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None, match_glob=None),
146147
]
147148
mock_hook.return_value.list.assert_has_calls(mock_calls)
149+
mock_hook.return_value.rewrite.assert_has_calls(
150+
[
151+
mock.call(TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, "test_object.txt"),
152+
]
153+
)
154+
155+
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
156+
def test_copy_file_with_exact_match_destination(self, mock_hook):
157+
SOURCE_FILES = [
158+
"test_object.txt",
159+
"test_object.txt.abc",
160+
"test_object.txt.copy/",
161+
"test_object.txt.folder/",
162+
]
163+
DESTINATION_OBJ = f"{DESTINATION_OBJECT_PREFIX}/test_object.txt"
164+
165+
mock_hook.return_value.list.return_value = SOURCE_FILES
166+
operator = GCSToGCSOperator(
167+
task_id=TASK_ID,
168+
source_bucket=TEST_BUCKET,
169+
source_object=SOURCE_OBJECT_NO_WILDCARD,
170+
destination_bucket=DESTINATION_BUCKET,
171+
destination_object=DESTINATION_OBJ,
172+
exact_match=True,
173+
)
174+
175+
operator.execute(None)
176+
mock_calls = [
177+
mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None, match_glob=None),
178+
]
179+
mock_hook.return_value.list.assert_has_calls(mock_calls)
180+
181+
mock_calls_rewrite = [
182+
mock.call(TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, DESTINATION_OBJ),
183+
]
184+
mock_hook.return_value.rewrite.assert_has_calls(mock_calls_rewrite)
148185

149186
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
150187
def test_execute_prefix_and_suffix(self, mock_hook):

0 commit comments

Comments
 (0)