Skip to content

Commit 1b8c440

Browse files
authored
fix: PubSubMessage leak on MessageDispatcher (#1197)
1 parent 044c62e commit 1b8c440

File tree

3 files changed

+82
-25
lines changed

3 files changed

+82
-25
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsub.v1;
17+
18+
import com.google.api.core.SettableApiFuture;
19+
20+
public class AckReplyConsumerImpl implements AckReplyConsumer {
21+
final SettableApiFuture<MessageDispatcher.AckReply> ackReplySettableApiFuture;
22+
23+
public AckReplyConsumerImpl(
24+
final SettableApiFuture<MessageDispatcher.AckReply> ackReplySettableApiFuture) {
25+
this.ackReplySettableApiFuture = ackReplySettableApiFuture;
26+
}
27+
28+
@Override
29+
public void ack() {
30+
ackReplySettableApiFuture.set(MessageDispatcher.AckReply.ACK);
31+
}
32+
33+
@Override
34+
public void nack() {
35+
ackReplySettableApiFuture.set(MessageDispatcher.AckReply.NACK);
36+
}
37+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsub.v1;
17+
18+
import com.google.api.core.SettableApiFuture;
19+
import java.util.concurrent.Future;
20+
21+
public class AckReplyConsumerWithResponseImpl implements AckReplyConsumerWithResponse {
22+
final SettableApiFuture<MessageDispatcher.AckReply> ackReplySettableApiFuture;
23+
final SettableApiFuture<AckResponse> messageFuture;
24+
25+
public AckReplyConsumerWithResponseImpl(
26+
SettableApiFuture<MessageDispatcher.AckReply> ackReplySettableApiFuture,
27+
SettableApiFuture<AckResponse> messageFuture) {
28+
this.ackReplySettableApiFuture = ackReplySettableApiFuture;
29+
this.messageFuture = messageFuture;
30+
}
31+
32+
@Override
33+
public Future<AckResponse> ack() {
34+
ackReplySettableApiFuture.set(MessageDispatcher.AckReply.ACK);
35+
return messageFuture;
36+
}
37+
38+
@Override
39+
public Future<AckResponse> nack() {
40+
ackReplySettableApiFuture.set(MessageDispatcher.AckReply.NACK);
41+
return messageFuture;
42+
}
43+
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import java.util.concurrent.ConcurrentHashMap;
3636
import java.util.concurrent.ConcurrentMap;
3737
import java.util.concurrent.Executor;
38-
import java.util.concurrent.Future;
3938
import java.util.concurrent.LinkedBlockingQueue;
4039
import java.util.concurrent.ScheduledExecutorService;
4140
import java.util.concurrent.ScheduledFuture;
@@ -434,33 +433,11 @@ public void run() {
434433
SettableApiFuture<AckResponse> messageFuture =
435434
ackHandler.getMessageFutureIfExists();
436435
final AckReplyConsumerWithResponse ackReplyConsumerWithResponse =
437-
new AckReplyConsumerWithResponse() {
438-
@Override
439-
public Future<AckResponse> ack() {
440-
ackReplySettableApiFuture.set(AckReply.ACK);
441-
return messageFuture;
442-
}
443-
444-
@Override
445-
public Future<AckResponse> nack() {
446-
ackReplySettableApiFuture.set(AckReply.NACK);
447-
return messageFuture;
448-
}
449-
};
436+
new AckReplyConsumerWithResponseImpl(ackReplySettableApiFuture, messageFuture);
450437
receiverWithAckResponse.receiveMessage(message, ackReplyConsumerWithResponse);
451438
} else {
452439
final AckReplyConsumer ackReplyConsumer =
453-
new AckReplyConsumer() {
454-
@Override
455-
public void ack() {
456-
ackReplySettableApiFuture.set(AckReply.ACK);
457-
}
458-
459-
@Override
460-
public void nack() {
461-
ackReplySettableApiFuture.set(AckReply.NACK);
462-
}
463-
};
440+
new AckReplyConsumerImpl(ackReplySettableApiFuture);
464441
receiver.receiveMessage(message, ackReplyConsumer);
465442
}
466443
} catch (Exception e) {

0 commit comments

Comments
 (0)