Skip to content

Commit 79a2fa7

Browse files
authored
Merge GCSObjectExistenceAsyncSensor logic to GCSObjectExistenceSensor (#30014)
1 parent 6a6bff3 commit 79a2fa7

File tree

4 files changed

+123
-41
lines changed

4 files changed

+123
-41
lines changed

airflow/providers/google/cloud/sensors/gcs.py

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import textwrap
2323
import warnings
2424
from datetime import datetime, timedelta
25-
from typing import TYPE_CHECKING, Callable, Sequence
25+
from typing import TYPE_CHECKING, Any, Callable, Sequence
2626

2727
from google.api_core.retry import Retry
2828
from google.cloud.storage.retry import DEFAULT_RETRY
@@ -75,6 +75,7 @@ def __init__(
7575
delegate_to: str | None = None,
7676
impersonation_chain: str | Sequence[str] | None = None,
7777
retry: Retry = DEFAULT_RETRY,
78+
deferrable: bool = False,
7879
**kwargs,
7980
) -> None:
8081

@@ -90,6 +91,8 @@ def __init__(
9091
self.impersonation_chain = impersonation_chain
9192
self.retry = retry
9293

94+
self.deferrable = deferrable
95+
9396
def poke(self, context: Context) -> bool:
9497
self.log.info("Sensor checks existence of : %s, %s", self.bucket, self.object)
9598
hook = GCSHook(
@@ -99,10 +102,43 @@ def poke(self, context: Context) -> bool:
99102
)
100103
return hook.exists(self.bucket, self.object, self.retry)
101104

105+
def execute(self, context: Context) -> None:
106+
"""Airflow runs this method on the worker and defers using the trigger."""
107+
if not self.deferrable:
108+
super().execute(context)
109+
else:
110+
self.defer(
111+
timeout=timedelta(seconds=self.timeout),
112+
trigger=GCSBlobTrigger(
113+
bucket=self.bucket,
114+
object_name=self.object,
115+
poke_interval=self.poke_interval,
116+
google_cloud_conn_id=self.google_cloud_conn_id,
117+
hook_params={
118+
"delegate_to": self.delegate_to,
119+
"impersonation_chain": self.impersonation_chain,
120+
},
121+
),
122+
method_name="execute_complete",
123+
)
124+
125+
def execute_complete(self, context: Context, event: dict[str, str]) -> str:
126+
"""
127+
Callback for when the trigger fires - returns immediately.
128+
Relies on trigger to throw an exception, otherwise it assumes execution was
129+
successful.
130+
"""
131+
if event["status"] == "error":
132+
raise AirflowException(event["message"])
133+
self.log.info("File %s was found in bucket %s.", self.object, self.bucket)
134+
return event["message"]
135+
102136

103137
class GCSObjectExistenceAsyncSensor(GCSObjectExistenceSensor):
104138
"""
105-
Checks for the existence of a file in Google Cloud Storage .
139+
Checks for the existence of a file in Google Cloud Storage.
140+
Class `GCSObjectExistenceAsyncSensor` is deprecated and will be removed in a future release.
141+
Please use `GCSObjectExistenceSensor` and set `deferrable` attribute to `True` instead
106142
107143
:param bucket: The Google Cloud Storage bucket where the object is.
108144
:param object: The name of the object to check in the Google cloud storage bucket.
@@ -120,33 +156,13 @@ class GCSObjectExistenceAsyncSensor(GCSObjectExistenceSensor):
120156
account from the list granting this role to the originating account (templated).
121157
"""
122158

123-
def execute(self, context: Context) -> None:
124-
"""Airflow runs this method on the worker and defers using the trigger."""
125-
self.defer(
126-
timeout=timedelta(seconds=self.timeout),
127-
trigger=GCSBlobTrigger(
128-
bucket=self.bucket,
129-
object_name=self.object,
130-
poke_interval=self.poke_interval,
131-
google_cloud_conn_id=self.google_cloud_conn_id,
132-
hook_params={
133-
"delegate_to": self.delegate_to,
134-
"impersonation_chain": self.impersonation_chain,
135-
},
136-
),
137-
method_name="execute_complete",
159+
def __init__(self, **kwargs: Any) -> None:
160+
warnings.warn(
161+
"Class `GCSObjectExistenceAsyncSensor` is deprecated and will be removed in a future release. "
162+
"Please use `GCSObjectExistenceSensor` and set `deferrable` attribute to `True` instead",
163+
DeprecationWarning,
138164
)
139-
140-
def execute_complete(self, context: Context, event: dict[str, str]) -> str:
141-
"""
142-
Callback for when the trigger fires - returns immediately.
143-
Relies on trigger to throw an exception, otherwise it assumes execution was
144-
successful.
145-
"""
146-
if event["status"] == "error":
147-
raise AirflowException(event["message"])
148-
self.log.info("File %s was found in bucket %s.", self.object, self.bucket)
149-
return event["message"]
165+
super().__init__(deferrable=True, **kwargs)
150166

151167

152168
def ts_function(context):

docs/apache-airflow-providers-google/operators/cloud/gcs.rst

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,14 +164,20 @@ Use the :class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSe
164164
:start-after: [START howto_sensor_object_exists_task]
165165
:end-before: [END howto_sensor_object_exists_task]
166166

167+
Also you can use deferrable mode in this operator if you would like to free up the worker slots while the sensor is running.
168+
169+
.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_gcs_sensor.py
170+
:language: python
171+
:dedent: 4
172+
:start-after: [START howto_sensor_object_exists_task_defered]
173+
:end-before: [END howto_sensor_object_exists_task_defered]
167174

168175
.. _howto/sensor:GCSObjectExistenceAsyncSensor:
169176

170177
GCSObjectExistenceAsyncSensor
171178
-----------------------------
172179

173-
Use the :class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceAsyncSensor`
174-
(deferrable version) if you would like to free up the worker slots while the sensor is running.
180+
:class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceAsyncSensor` is deprecated and will be removed in a future release. Please use :class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor` and use the deferrable mode in that operator.
175181

176182
.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_gcs_sensor.py
177183
:language: python

tests/providers/google/cloud/sensors/test_gcs.py

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -98,47 +98,96 @@ def test_should_pass_argument_to_hook(self, mock_hook):
9898
)
9999
mock_hook.return_value.exists.assert_called_once_with(TEST_BUCKET, TEST_OBJECT, DEFAULT_RETRY)
100100

101-
102-
class TestGoogleCloudStorageObjectSensorAsync:
103-
def test_gcs_object_existence_sensor_async(self):
101+
def test_gcs_object_existence_sensor_deferred(self):
104102
"""
105103
Asserts that a task is deferred and a GCSBlobTrigger will be fired
106-
when the GCSObjectExistenceAsyncSensor is executed.
104+
when the GCSObjectExistenceSensor is executed and deferrable is set to True.
107105
"""
108-
task = GCSObjectExistenceAsyncSensor(
106+
task = GCSObjectExistenceSensor(
109107
task_id="task-id",
110108
bucket=TEST_BUCKET,
111109
object=TEST_OBJECT,
112110
google_cloud_conn_id=TEST_GCP_CONN_ID,
111+
deferrable=True,
113112
)
114113
with pytest.raises(TaskDeferred) as exc:
115114
task.execute(context)
116115
assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not a GCSBlobTrigger"
117116

118-
def test_gcs_object_existence_sensor_async_execute_failure(self):
119-
"""Tests that an AirflowException is raised in case of error event"""
120-
task = GCSObjectExistenceAsyncSensor(
117+
def test_gcs_object_existence_sensor_deferred_execute_failure(self):
118+
"""Tests that an AirflowException is raised in case of error event when deferrable is set to True"""
119+
task = GCSObjectExistenceSensor(
121120
task_id="task-id",
122121
bucket=TEST_BUCKET,
123122
object=TEST_OBJECT,
124123
google_cloud_conn_id=TEST_GCP_CONN_ID,
124+
deferrable=True,
125125
)
126126
with pytest.raises(AirflowException):
127127
task.execute_complete(context=None, event={"status": "error", "message": "test failure message"})
128128

129129
def test_gcs_object_existence_sensor_async_execute_complete(self):
130-
"""Asserts that logging occurs as expected"""
131-
task = GCSObjectExistenceAsyncSensor(
130+
"""Asserts that logging occurs as expected when deferrable is set to True"""
131+
task = GCSObjectExistenceSensor(
132132
task_id="task-id",
133133
bucket=TEST_BUCKET,
134134
object=TEST_OBJECT,
135135
google_cloud_conn_id=TEST_GCP_CONN_ID,
136+
deferrable=True,
136137
)
137138
with mock.patch.object(task.log, "info") as mock_log_info:
138139
task.execute_complete(context=None, event={"status": "success", "message": "Job completed"})
139140
mock_log_info.assert_called_with("File %s was found in bucket %s.", TEST_OBJECT, TEST_BUCKET)
140141

141142

143+
class TestGoogleCloudStorageObjectSensorAsync:
144+
depcrecation_message = (
145+
"Class `GCSObjectExistenceAsyncSensor` is deprecated and will be removed in a future release. "
146+
"Please use `GCSObjectExistenceSensor` and set `deferrable` attribute to `True` instead"
147+
)
148+
149+
def test_gcs_object_existence_sensor_async(self):
150+
"""
151+
Asserts that a task is deferred and a GCSBlobTrigger will be fired
152+
when the GCSObjectExistenceAsyncSensor is executed.
153+
"""
154+
with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
155+
task = GCSObjectExistenceAsyncSensor(
156+
task_id="task-id",
157+
bucket=TEST_BUCKET,
158+
object=TEST_OBJECT,
159+
google_cloud_conn_id=TEST_GCP_CONN_ID,
160+
)
161+
with pytest.raises(TaskDeferred) as exc:
162+
task.execute(context)
163+
assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not a GCSBlobTrigger"
164+
165+
def test_gcs_object_existence_sensor_async_execute_failure(self):
166+
"""Tests that an AirflowException is raised in case of error event"""
167+
with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
168+
task = GCSObjectExistenceAsyncSensor(
169+
task_id="task-id",
170+
bucket=TEST_BUCKET,
171+
object=TEST_OBJECT,
172+
google_cloud_conn_id=TEST_GCP_CONN_ID,
173+
)
174+
with pytest.raises(AirflowException):
175+
task.execute_complete(context=None, event={"status": "error", "message": "test failure message"})
176+
177+
def test_gcs_object_existence_sensor_async_execute_complete(self):
178+
"""Asserts that logging occurs as expected"""
179+
with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
180+
task = GCSObjectExistenceAsyncSensor(
181+
task_id="task-id",
182+
bucket=TEST_BUCKET,
183+
object=TEST_OBJECT,
184+
google_cloud_conn_id=TEST_GCP_CONN_ID,
185+
)
186+
with mock.patch.object(task.log, "info") as mock_log_info:
187+
task.execute_complete(context=None, event={"status": "success", "message": "Job completed"})
188+
mock_log_info.assert_called_with("File %s was found in bucket %s.", TEST_OBJECT, TEST_BUCKET)
189+
190+
142191
class TestTsFunction:
143192
def test_should_support_datetime(self):
144193
context = {

tests/system/providers/google/cloud/gcs/example_gcs_sensor.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ def mode_setter(self, value):
124124
)
125125
# [END howto_sensor_object_exists_task_async]
126126

127+
# [START howto_sensor_object_exists_task_defered]
128+
gcs_object_exists_defered = GCSObjectExistenceSensor(
129+
bucket=BUCKET_NAME, object=FILE_NAME, task_id="gcs_object_exists_defered", deferrable=True
130+
)
131+
# [END howto_sensor_object_exists_task_defered]
132+
127133
# [START howto_sensor_object_with_prefix_exists_task]
128134
gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor(
129135
bucket=BUCKET_NAME,
@@ -144,7 +150,12 @@ def mode_setter(self, value):
144150
sleep,
145151
upload_file,
146152
# TEST BODY
147-
[gcs_object_exists, gcs_object_exists_async, gcs_object_with_prefix_exists],
153+
[
154+
gcs_object_exists,
155+
gcs_object_exists_defered,
156+
gcs_object_exists_async,
157+
gcs_object_with_prefix_exists,
158+
],
148159
# TEST TEARDOWN
149160
delete_bucket,
150161
)

0 commit comments

Comments
 (0)