Skip to content

Commit 3c08c02

Browse files
authored
Move validation of templated input params to run after the context init (#19048)
* Fix #14682, move input params validation into `execute()` * Adjust tests for LocalFilesystemToS3Operator
1 parent efdfd15 commit 3c08c02

File tree

4 files changed

+18
-11
lines changed

4 files changed

+18
-11
lines changed

airflow/providers/amazon/aws/transfers/local_to_s3.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,12 @@ def __init__(
9494
self.gzip = gzip
9595
self.acl_policy = acl_policy
9696

97+
def _check_inputs(self):
9798
if 's3://' in self.dest_key and self.dest_bucket is not None:
9899
raise TypeError('dest_bucket should be None when dest_key is provided as a full s3:// file path.')
99100

100101
def execute(self, context):
102+
self._check_inputs()
101103
s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
102104
s3_hook.load_file(
103105
self.filename,

airflow/providers/google/cloud/transfers/azure_fileshare_to_gcs.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ def __init__(
103103
self.gzip = gzip
104104
self.google_impersonation_chain = google_impersonation_chain
105105

106-
if dest_gcs and not gcs_object_is_directory(self.dest_gcs):
106+
def _check_inputs(self) -> None:
107+
if self.dest_gcs and not gcs_object_is_directory(self.dest_gcs):
107108
self.log.info(
108109
'Destination Google Cloud Storage path is not a valid '
109110
'"directory", define a path that ends with a slash "/" or '
@@ -114,6 +115,7 @@ def __init__(
114115
)
115116

116117
def execute(self, context):
118+
self._check_inputs()
117119
azure_fileshare_hook = AzureFileShareHook(self.azure_fileshare_conn_id)
118120
files = azure_fileshare_hook.list_files(
119121
share_name=self.share_name, directory_name=self.directory_name

airflow/providers/google/cloud/transfers/s3_to_gcs.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,8 @@ def __init__(
147147
self.gzip = gzip
148148
self.google_impersonation_chain = google_impersonation_chain
149149

150-
if dest_gcs and not gcs_object_is_directory(self.dest_gcs):
150+
def _check_inputs(self) -> None:
151+
if self.dest_gcs and not gcs_object_is_directory(self.dest_gcs):
151152
self.log.info(
152153
'Destination Google Cloud Storage path is not a valid '
153154
'"directory", define a path that ends with a slash "/" or '
@@ -158,6 +159,7 @@ def __init__(
158159
)
159160

160161
def execute(self, context):
162+
self._check_inputs()
161163
# use the super method to list all the files in an S3 bucket/key
162164
files = super().execute(context)
163165

tests/providers/amazon/aws/transfers/test_local_to_s3.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,17 @@ def test_init(self):
6161
assert operator.encrypt == self._config['encrypt']
6262
assert operator.gzip == self._config['gzip']
6363

64-
def test_init_exception(self):
64+
def test_execute_exception(self):
65+
operator = LocalFilesystemToS3Operator(
66+
task_id='file_to_s3_operatro_exception',
67+
dag=self.dag,
68+
filename=self.testfile1,
69+
dest_key=f's3://dummy/{self.dest_key}',
70+
dest_bucket=self.dest_bucket,
71+
**self._config,
72+
)
6573
with self.assertRaises(TypeError):
66-
LocalFilesystemToS3Operator(
67-
task_id='file_to_s3_operatro_exception',
68-
dag=self.dag,
69-
filename=self.testfile1,
70-
dest_key=f's3://dummy/{self.dest_key}',
71-
dest_bucket=self.dest_bucket,
72-
**self._config,
73-
)
74+
operator.execute(None)
7475

7576
@mock_s3
7677
def test_execute(self):

0 commit comments

Comments
 (0)