Skip to content

Commit 1e7c0a6

Browse files
Gurov Ilyapradn
authored andcommitted
feat(pubsub): add stop method (#9365)
1 parent 879d9a1 commit 1e7c0a6

File tree

4 files changed

+87
-24
lines changed

4 files changed

+87
-24
lines changed

pubsub/google/cloud/pubsub_v1/futures.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,7 @@ def running(self):
7474
bool: ``True`` if this method has not yet completed, or
7575
``False`` if it has completed.
7676
"""
77-
if self.done():
78-
return False
79-
return True
77+
return not self.done()
8078

8179
def done(self):
8280
"""Return True the future is done, False otherwise.

pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ def __init__(self, client, topic, settings, autocommit=True):
7474
self._state_lock = threading.Lock()
7575
# These members are all communicated between threads; ensure that
7676
# any writes to them use the "state lock" to remain atomic.
77+
# _futures list should remain unchanged after batch
78+
# status changed from ACCEPTING_MESSAGES to any other
79+
# in order to avoid race conditions
7780
self._futures = []
7881
self._messages = []
7982
self._size = 0

pubsub/google/cloud/pubsub_v1/publisher/client.py

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ def __init__(self, batch_settings=(), **kwargs):
134134
# messages. One batch exists for each topic.
135135
self._batch_lock = self._batch_class.make_lock()
136136
self._batches = {}
137+
self._is_stopped = False
137138

138139
@classmethod
139140
def from_service_account_file(cls, filename, batch_settings=(), **kwargs):
@@ -187,20 +188,19 @@ def _batch(self, topic, create=False, autocommit=True):
187188
"""
188189
# If there is no matching batch yet, then potentially create one
189190
# and place it on the batches dictionary.
190-
with self._batch_lock:
191-
if not create:
192-
batch = self._batches.get(topic)
193-
if batch is None:
194-
create = True
195-
196-
if create:
197-
batch = self._batch_class(
198-
autocommit=autocommit,
199-
client=self,
200-
settings=self.batch_settings,
201-
topic=topic,
202-
)
203-
self._batches[topic] = batch
191+
if not create:
192+
batch = self._batches.get(topic)
193+
if batch is None:
194+
create = True
195+
196+
if create:
197+
batch = self._batch_class(
198+
autocommit=autocommit,
199+
client=self,
200+
settings=self.batch_settings,
201+
topic=topic,
202+
)
203+
self._batches[topic] = batch
204204

205205
return batch
206206

@@ -242,12 +242,17 @@ def publish(self, topic, data, **attrs):
242242
instance that conforms to Python Standard library's
243243
:class:`~concurrent.futures.Future` interface (but not an
244244
instance of that class).
245+
246+
Raises:
247+
RuntimeError:
248+
If called after publisher has been stopped
249+
by a `stop()` method call.
245250
"""
246251
# Sanity check: Is the data being sent as a bytestring?
247252
# If it is literally anything else, complain loudly about it.
248253
if not isinstance(data, six.binary_type):
249254
raise TypeError(
250-
"Data being published to Pub/Sub must be sent " "as a bytestring."
255+
"Data being published to Pub/Sub must be sent as a bytestring."
251256
)
252257

253258
# Coerce all attributes to text strings.
@@ -266,11 +271,38 @@ def publish(self, topic, data, **attrs):
266271
message = types.PubsubMessage(data=data, attributes=attrs)
267272

268273
# Delegate the publishing to the batch.
269-
batch = self._batch(topic)
270-
future = None
271-
while future is None:
272-
future = batch.publish(message)
273-
if future is None:
274-
batch = self._batch(topic, create=True)
274+
with self._batch_lock:
275+
if self._is_stopped:
276+
raise RuntimeError("Cannot publish on a stopped publisher.")
277+
278+
batch = self._batch(topic)
279+
future = None
280+
while future is None:
281+
future = batch.publish(message)
282+
if future is None:
283+
batch = self._batch(topic, create=True)
275284

276285
return future
286+
287+
def stop(self):
288+
"""Immediately publish all outstanding messages.
289+
290+
Asynchronously sends all outstanding messages and
291+
prevents future calls to `publish()`. Method should
292+
be invoked prior to deleting this `Client()` object
293+
in order to ensure that no pending messages are lost.
294+
295+
.. note::
296+
297+
This method is non-blocking. Use `Future()` objects
298+
returned by `publish()` to make sure all publish
299+
requests completed, either in success or error.
300+
"""
301+
with self._batch_lock:
302+
if self._is_stopped:
303+
raise RuntimeError("Cannot stop a publisher already stopped.")
304+
305+
self._is_stopped = True
306+
307+
for batch in self._batches.values():
308+
batch.commit()

pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,36 @@ def test_publish_attrs_type_error():
201201
client.publish(topic, b"foo", answer=42)
202202

203203

204+
def test_stop():
205+
creds = mock.Mock(spec=credentials.Credentials)
206+
client = publisher.Client(credentials=creds)
207+
208+
batch = client._batch("topic1", autocommit=False)
209+
batch2 = client._batch("topic2", autocommit=False)
210+
211+
pubsub_msg = types.PubsubMessage(data=b"msg")
212+
213+
patch = mock.patch.object(batch, "commit")
214+
patch2 = mock.patch.object(batch2, "commit")
215+
216+
with patch as commit_mock, patch2 as commit_mock2:
217+
batch.publish(pubsub_msg)
218+
batch2.publish(pubsub_msg)
219+
220+
client.stop()
221+
222+
# check if commit() called
223+
commit_mock.assert_called()
224+
commit_mock2.assert_called()
225+
226+
# check that closed publisher doesn't accept new messages
227+
with pytest.raises(RuntimeError):
228+
client.publish("topic1", b"msg2")
229+
230+
with pytest.raises(RuntimeError):
231+
client.stop()
232+
233+
204234
def test_gapic_instance_method():
205235
creds = mock.Mock(spec=credentials.Credentials)
206236
client = publisher.Client(credentials=creds)

0 commit comments

Comments
 (0)