Skip to content

Commit 72ba63e

Browse files
authored
added Topic params for schema_settings and message_retention_duration. (#35767)
1 parent f4e5571 commit 72ba63e

File tree

4 files changed

+29
-1
lines changed

4 files changed

+29
-1
lines changed

airflow/providers/google/cloud/hooks/pubsub.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
PushConfig,
5858
ReceivedMessage,
5959
RetryPolicy,
60+
SchemaSettings,
6061
)
6162

6263

@@ -182,6 +183,8 @@ def create_topic(
182183
labels: dict[str, str] | None = None,
183184
message_storage_policy: dict | MessageStoragePolicy = None,
184185
kms_key_name: str | None = None,
186+
schema_settings: dict | SchemaSettings = None,
187+
message_retention_duration: str | None = None,
185188
retry: Retry | _MethodDefault = DEFAULT,
186189
timeout: float | None = None,
187190
metadata: Sequence[tuple[str, str]] = (),
@@ -206,6 +209,11 @@ def create_topic(
206209
to be used to protect access to messages published on this topic.
207210
The expected format is
208211
``projects/*/locations/*/keyRings/*/cryptoKeys/*``.
212+
:param schema_settings: (Optional) Settings for validating messages published against an
213+
existing schema. The expected format is ``projects/*/schemas/*``.
214+
:param message_retention_duration: (Optional) Indicates the minimum duration to retain a
215+
message after it is published to the topic. The expected format is a duration in
216+
seconds with up to nine fractional digits, ending with 's'. Example: "3.5s".
209217
:param retry: (Optional) A retry object used to retry requests.
210218
If None is specified, requests will not be retried.
211219
:param timeout: (Optional) The amount of time, in seconds, to wait for the request
@@ -228,6 +236,8 @@ def create_topic(
228236
"labels": labels,
229237
"message_storage_policy": message_storage_policy,
230238
"kms_key_name": kms_key_name,
239+
"schema_settings": schema_settings,
240+
"message_retention_duration": message_retention_duration,
231241
},
232242
retry=retry,
233243
timeout=timeout,

airflow/providers/google/cloud/operators/pubsub.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
PushConfig,
3636
ReceivedMessage,
3737
RetryPolicy,
38+
SchemaSettings,
3839
)
3940

4041
from airflow.providers.google.cloud.hooks.pubsub import PubSubHook
@@ -130,6 +131,8 @@ def __init__(
130131
labels: dict[str, str] | None = None,
131132
message_storage_policy: dict | MessageStoragePolicy = None,
132133
kms_key_name: str | None = None,
134+
schema_settings: dict | SchemaSettings = None,
135+
message_retention_duration: str | None = None,
133136
retry: Retry | _MethodDefault = DEFAULT,
134137
timeout: float | None = None,
135138
metadata: Sequence[tuple[str, str]] = (),
@@ -144,6 +147,8 @@ def __init__(
144147
self.labels = labels
145148
self.message_storage_policy = message_storage_policy
146149
self.kms_key_name = kms_key_name
150+
self.schema_settings = schema_settings
151+
self.message_retention_duration = message_retention_duration
147152
self.retry = retry
148153
self.timeout = timeout
149154
self.metadata = metadata
@@ -163,6 +168,8 @@ def execute(self, context: Context) -> None:
163168
labels=self.labels,
164169
message_storage_policy=self.message_storage_policy,
165170
kms_key_name=self.kms_key_name,
171+
schema_settings=self.schema_settings,
172+
message_retention_duration=self.message_retention_duration,
166173
retry=self.retry,
167174
timeout=self.timeout,
168175
metadata=self.metadata,

tests/providers/google/cloud/hooks/test_pubsub.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,14 @@ def test_create_nonexistent_topic(self, mock_service):
103103
create_method = mock_service.return_value.create_topic
104104
self.pubsub_hook.create_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC)
105105
create_method.assert_called_once_with(
106-
request=dict(name=EXPANDED_TOPIC, labels=LABELS, message_storage_policy=None, kms_key_name=None),
106+
request=dict(
107+
name=EXPANDED_TOPIC,
108+
labels=LABELS,
109+
message_storage_policy=None,
110+
kms_key_name=None,
111+
schema_settings=None,
112+
message_retention_duration=None,
113+
),
107114
retry=DEFAULT,
108115
timeout=None,
109116
metadata=(),

tests/providers/google/cloud/operators/test_pubsub.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ def test_failifexists(self, mock_hook):
5959
labels=None,
6060
message_storage_policy=None,
6161
kms_key_name=None,
62+
schema_settings=None,
63+
message_retention_duration=None,
6264
retry=DEFAULT,
6365
timeout=None,
6466
metadata=(),
@@ -79,6 +81,8 @@ def test_succeedifexists(self, mock_hook):
7981
labels=None,
8082
message_storage_policy=None,
8183
kms_key_name=None,
84+
schema_settings=None,
85+
message_retention_duration=None,
8286
retry=DEFAULT,
8387
timeout=None,
8488
metadata=(),

0 commit comments

Comments
 (0)