Skip to content

Commit 9bcc433

Browse files
fix: shutdown grpc stubs properly when a subscriber is stopped (#74)
* Modifying Publish example in README to match other examples given, and fix issue #6784 * fix: Modifying Publish example in README to match other examples, and fix Issue #11 * feat: Adding support for DLQs Adding delivery attempt count to PubsubMessages as a message attribute, and creating helper function to allow users to get the count without knowing implementation details. * Fix formatting * fix: making changes requested in pull request * fix: creating fix to not populate delivery attempt attribute when dead lettering is not enabled * Adding unit test for case in which a received message has no delivery attempt * Making MessageWaiter class more generic to also be used for outstanding ack operations * Waiting for acks to complete before shutting down a streaming subscriber connection * Fixing formatting error
1 parent 052ed86 commit 9bcc433

File tree

6 files changed

+43
-31
lines changed

6 files changed

+43
-31
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ class MessageDispatcher {
7272
private final AckProcessor ackProcessor;
7373

7474
private final FlowController flowController;
75-
private final MessageWaiter messagesWaiter;
75+
private final Waiter messagesWaiter;
7676

7777
// Maps ID to "total expiration time". If it takes longer than this, stop extending.
7878
private final ConcurrentMap<String, AckHandler> pendingMessages = new ConcurrentHashMap<>();
@@ -145,7 +145,7 @@ private void forget() {
145145
return;
146146
}
147147
flowController.release(1, outstandingBytes);
148-
messagesWaiter.incrementPendingMessages(-1);
148+
messagesWaiter.incrementPendingCount(-1);
149149
}
150150

151151
@Override
@@ -205,7 +205,7 @@ void sendAckOperations(
205205
// 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
206206
this.ackLatencyDistribution = ackLatencyDistribution;
207207
jobLock = new ReentrantLock();
208-
messagesWaiter = new MessageWaiter();
208+
messagesWaiter = new Waiter();
209209
this.clock = clock;
210210
this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(executor);
211211
}
@@ -268,7 +268,7 @@ public void run() {
268268
}
269269

270270
void stop() {
271-
messagesWaiter.waitNoMessages();
271+
messagesWaiter.waitComplete();
272272
jobLock.lock();
273273
try {
274274
if (backgroundJob != null) {
@@ -331,9 +331,9 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
331331
}
332332

333333
private void processBatch(List<OutstandingMessage> batch) {
334-
messagesWaiter.incrementPendingMessages(batch.size());
334+
messagesWaiter.incrementPendingCount(batch.size());
335335
for (OutstandingMessage message : batch) {
336-
// This is a blocking flow controller. We have already incremented MessageWaiter, so
336+
// This is a blocking flow controller. We have already incremented messagesWaiter, so
337337
// shutdown will block on processing of all these messages anyway.
338338
try {
339339
flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize());

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public class Publisher {
104104

105105
private final AtomicBoolean shutdown;
106106
private final BackgroundResource backgroundResources;
107-
private final MessageWaiter messagesWaiter;
107+
private final Waiter messagesWaiter;
108108
private ScheduledFuture<?> currentAlarmFuture;
109109
private final ApiFunction<PubsubMessage, PubsubMessage> messageTransform;
110110

@@ -173,7 +173,7 @@ private Publisher(Builder builder) throws IOException {
173173
backgroundResourceList.add(publisherStub);
174174
backgroundResources = new BackgroundResourceAggregation(backgroundResourceList);
175175
shutdown = new AtomicBoolean(false);
176-
messagesWaiter = new MessageWaiter();
176+
messagesWaiter = new Waiter();
177177
}
178178

179179
/** Topic which the publisher publishes to. */
@@ -249,7 +249,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
249249
messagesBatchLock.unlock();
250250
}
251251

252-
messagesWaiter.incrementPendingMessages(1);
252+
messagesWaiter.incrementPendingCount(1);
253253

254254
// For messages without ordering keys, it is okay to send batches without holding
255255
// messagesBatchLock.
@@ -423,7 +423,7 @@ public void onSuccess(PublishResponse result) {
423423
}
424424
}
425425
} finally {
426-
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());
426+
messagesWaiter.incrementPendingCount(-outstandingBatch.size());
427427
}
428428
}
429429

@@ -432,7 +432,7 @@ public void onFailure(Throwable t) {
432432
try {
433433
outstandingBatch.onFailure(t);
434434
} finally {
435-
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());
435+
messagesWaiter.incrementPendingCount(-outstandingBatch.size());
436436
}
437437
}
438438
};

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
7272

7373
private final AtomicLong channelReconnectBackoffMillis =
7474
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
75+
private final Waiter ackOperationsWaiter = new Waiter();
7576

7677
private final Lock lock = new ReentrantLock();
7778
private ClientStream<StreamingPullRequest> clientStream;
@@ -116,6 +117,7 @@ protected void doStart() {
116117
@Override
117118
protected void doStop() {
118119
messageDispatcher.stop();
120+
ackOperationsWaiter.waitComplete();
119121

120122
lock.lock();
121123
try {
@@ -273,16 +275,18 @@ public void sendAckOperations(
273275
new ApiFutureCallback<Empty>() {
274276
@Override
275277
public void onSuccess(Empty empty) {
276-
// noop
278+
ackOperationsWaiter.incrementPendingCount(-1);
277279
}
278280

279281
@Override
280282
public void onFailure(Throwable t) {
283+
ackOperationsWaiter.incrementPendingCount(-1);
281284
Level level = isAlive() ? Level.WARNING : Level.FINER;
282285
logger.log(level, "failed to send operations", t);
283286
}
284287
};
285288

289+
int pendingOperations = 0;
286290
for (PendingModifyAckDeadline modack : ackDeadlineExtensions) {
287291
for (List<String> idChunk : Lists.partition(modack.ackIds, MAX_PER_REQUEST_CHANGES)) {
288292
ApiFuture<Empty> future =
@@ -294,6 +298,7 @@ public void onFailure(Throwable t) {
294298
.setAckDeadlineSeconds(modack.deadlineExtensionSeconds)
295299
.build());
296300
ApiFutures.addCallback(future, loggingCallback, directExecutor());
301+
pendingOperations++;
297302
}
298303
}
299304

@@ -306,6 +311,9 @@ public void onFailure(Throwable t) {
306311
.addAllAckIds(idChunk)
307312
.build());
308313
ApiFutures.addCallback(future, loggingCallback, directExecutor());
314+
pendingOperations++;
309315
}
316+
317+
ackOperationsWaiter.incrementPendingCount(pendingOperations);
310318
}
311319
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,7 @@ public void run() {
305305
// stop connection is no-op if connections haven't been started.
306306
stopAllStreamingConnections();
307307
shutdownBackgroundResources();
308+
subStub.shutdownNow();
308309
notifyStopped();
309310
} catch (Exception e) {
310311
notifyFailed(e);

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageWaiter.java renamed to google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Waiter.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,28 @@
1818

1919
import com.google.api.core.InternalApi;
2020

21-
/** A barrier kind of object that helps to keep track and synchronously wait on pending messages. */
22-
class MessageWaiter {
23-
private int pendingMessages;
21+
/**
22+
* A barrier kind of object that helps keep track of pending actions and synchronously wait until
23+
* all have completed.
24+
*/
25+
class Waiter {
26+
private int pendingCount;
2427

25-
MessageWaiter() {
26-
pendingMessages = 0;
28+
Waiter() {
29+
pendingCount = 0;
2730
}
2831

29-
public synchronized void incrementPendingMessages(int messages) {
30-
this.pendingMessages += messages;
31-
if (pendingMessages == 0) {
32+
public synchronized void incrementPendingCount(int delta) {
33+
this.pendingCount += delta;
34+
if (pendingCount == 0) {
3235
notifyAll();
3336
}
3437
}
3538

36-
public synchronized void waitNoMessages() {
39+
public synchronized void waitComplete() {
3740
boolean interrupted = false;
3841
try {
39-
while (pendingMessages > 0) {
42+
while (pendingCount > 0) {
4043
try {
4144
wait();
4245
} catch (InterruptedException e) {
@@ -52,7 +55,7 @@ public synchronized void waitNoMessages() {
5255
}
5356

5457
@InternalApi
55-
public int pendingMessages() {
56-
return pendingMessages;
58+
public int pendingCount() {
59+
return pendingCount;
5760
}
5861
}

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageWaiterTest.java renamed to google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/WaiterTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@
2222
import org.junit.runner.RunWith;
2323
import org.junit.runners.JUnit4;
2424

25-
/** Tests for {@link MessageWaiter}. */
25+
/** Tests for {@link Waiter}. */
2626
@RunWith(JUnit4.class)
27-
public class MessageWaiterTest {
27+
public class WaiterTest {
2828

2929
@Test
3030
public void test() throws Exception {
31-
final MessageWaiter waiter = new MessageWaiter();
32-
waiter.incrementPendingMessages(1);
31+
final Waiter waiter = new Waiter();
32+
waiter.incrementPendingCount(1);
3333

3434
final Thread mainThread = Thread.currentThread();
3535
Thread t =
@@ -40,14 +40,14 @@ public void run() {
4040
while (mainThread.getState() != Thread.State.WAITING) {
4141
Thread.yield();
4242
}
43-
waiter.incrementPendingMessages(-1);
43+
waiter.incrementPendingCount(-1);
4444
}
4545
});
4646
t.start();
4747

48-
waiter.waitNoMessages();
48+
waiter.waitComplete();
4949
t.join();
5050

51-
assertEquals(0, waiter.pendingMessages());
51+
assertEquals(0, waiter.pendingCount());
5252
}
5353
}

0 commit comments

Comments
 (0)