Skip to content

Commit db522b6

Browse files
michaelpri10gcf-owl-bot[bot]cloud-java-bot
authored
feat: Add OpenTelemetry tracing to the Publisher and Subscriber (googleapis#2086)
* feat: Initial publish side Open Telemetry support * feat: Publish-side trace context injection * feat: Tests and improvements to publish side OTel tracing * feat: More tests and refactoring for publish-side OpenTelemetry * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: Formatting files * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: Publisher test changes * test: Fix OpenTelemetry test * Feat: Use OpenTelemetry semconv * test: Fix some dependency issues * feat: Test fix * feat: Add comment for setter in builder * Opentelemetry subscribe (googleapis#2100) * feat: Add OpenTelemetry tracing to the SubscriberClient * feat: Add link to publisher create span in the subscribe process span * feat: Add Ack/Nack/ModAck RPC spans to the subscribe * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Opentelemetry subscribe (googleapis#2101) * feat: Add OpenTelemetry tracing to the SubscriberClient * feat: Add link to publisher create span in the subscribe process span * feat: Add Ack/Nack/ModAck RPC spans to the subscribe * fix: Fix test errors caused by otel changes * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: Fix build errors in Publisher * test: Ignore org.assertj:assertj-core which is required for OTel testing assertions * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * test: Add tests for subscriber OTel functions * feat: Changes to OpenTelemetry implementation to add links earlier and prevent methods from being exposed to users * feat: Refactor OpenTelemetry implementation to use a context aware wrapper for the tracer and a PubsubTracer interface * feat: Initialize default no-op PubsubTracer in Publisher and Subscriber * feat: Ensure SubscriberStreamingConnection and MessageDispatcher have default no-op tracers by default for tests * samples: Add OpenTelemetry publisher and subscriber samples * feat: Add additional sampling checks to the Otel implementation * samples: Update pom.xml for samples with Cloud Trace exporter * feat: Make OTel classes/methods package-private and remove non-generic PubsubTracer interface * feat: Lint fixes for Pub/Sub * feat: Use MessagingIncubatingAttributes for gcp_pubsub attribute names * feat: Format OTel changes * Revert "feat: Use MessagingIncubatingAttributes for gcp_pubsub attribute names" This reverts commit 305610e. * feat: trigger build * chore: generate libraries at Mon Sep 30 20:37:03 UTC 2024 * feat: trigger build * feat: Fix file overwrite from bad merge * chore: generate libraries at Mon Sep 30 20:49:40 UTC 2024 * Revert "chore: generate libraries at Mon Sep 30 20:49:40 UTC 2024" This reverts commit 5ebbbf9. * chore: generate libraries at Mon Sep 30 21:03:31 UTC 2024 * Revert "chore: generate libraries at Mon Sep 30 21:03:31 UTC 2024" This reverts commit 23f3a70. * chore: generate libraries at Mon Sep 30 21:14:11 UTC 2024 * feat: Prevent new files for OpenTelemetry from being overwritten * feat: Revert automated file deletion for OpenTelemetry changes * feat: Remove OpenTelemetry samples as the samples use a released library version to run * chore: generate libraries at Mon Sep 30 22:11:14 UTC 2024 --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com> Co-authored-by: cloud-java-bot <[email protected]>
1 parent c126927 commit db522b6

File tree

16 files changed

+2008
-31
lines changed

16 files changed

+2008
-31
lines changed

.github/.OwlBot-hermetic.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ deep-preserve-regex:
3333
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDataMatcher.java"
3434
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java"
3535
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java"
36+
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java"
3637
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java"
3738
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java"
3839
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StatusUtilTest.java"
@@ -51,8 +52,10 @@ deep-preserve-regex:
5152
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageReceiverWithAckResponse.java"
5253
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java"
5354
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java"
55+
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java"
5456
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java"
5557
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PublisherInterface.java"
58+
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java"
5659
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java"
5760
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java"
5861
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java"

google-cloud-pubsub/pom.xml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,18 @@
100100
<artifactId>google-http-client</artifactId>
101101
<scope>runtime</scope>
102102
</dependency>
103+
<dependency>
104+
<groupId>io.opentelemetry</groupId>
105+
<artifactId>opentelemetry-api</artifactId>
106+
</dependency>
107+
<dependency>
108+
<groupId>io.opentelemetry</groupId>
109+
<artifactId>opentelemetry-context</artifactId>
110+
</dependency>
111+
<dependency>
112+
<groupId>io.opentelemetry</groupId>
113+
<artifactId>opentelemetry-semconv</artifactId>
114+
</dependency>
103115

104116
<!-- Test dependencies -->
105117
<dependency>
@@ -142,6 +154,21 @@
142154
<artifactId>opencensus-impl</artifactId>
143155
<scope>test</scope>
144156
</dependency>
157+
<dependency>
158+
<groupId>io.opentelemetry</groupId>
159+
<artifactId>opentelemetry-sdk-trace</artifactId>
160+
<scope>test</scope>
161+
</dependency>
162+
<dependency>
163+
<groupId>io.opentelemetry</groupId>
164+
<artifactId>opentelemetry-sdk-testing</artifactId>
165+
<scope>test</scope>
166+
</dependency>
167+
<dependency>
168+
<groupId>org.assertj</groupId>
169+
<artifactId>assertj-core</artifactId>
170+
<scope>test</scope>
171+
</dependency>
145172
<!-- Need testing utility classes for generated gRPC clients tests -->
146173
<dependency>
147174
<groupId>com.google.api</groupId>
@@ -174,6 +201,7 @@
174201
<ignoredUnusedDeclaredDependency>com.google.auth:google-auth-library-oauth2-http:jar</ignoredUnusedDeclaredDependency>
175202
<ignoredUnusedDeclaredDependency>io.opencensus:opencensus-impl</ignoredUnusedDeclaredDependency>
176203
<ignoredUnusedDeclaredDependency>javax.annotation:javax.annotation-api</ignoredUnusedDeclaredDependency>
204+
<ignoredUnusedDeclaredDependency>org.assertj:assertj-core</ignoredUnusedDeclaredDependency>
177205
</ignoredUnusedDeclaredDependencies>
178206
</configuration>
179207
</plugin>

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@
2222
public class AckRequestData {
2323
private final String ackId;
2424
private final Optional<SettableApiFuture<AckResponse>> messageFuture;
25+
private PubsubMessageWrapper messageWrapper;
2526

2627
protected AckRequestData(Builder builder) {
2728
this.ackId = builder.ackId;
2829
this.messageFuture = builder.messageFuture;
30+
this.messageWrapper = builder.messageWrapper;
2931
}
3032

3133
public String getAckId() {
@@ -36,6 +38,17 @@ public SettableApiFuture<AckResponse> getMessageFutureIfExists() {
3638
return this.messageFuture.orElse(null);
3739
}
3840

41+
/**
42+
* Returns an empty PubsubMessageWrapper with OpenTelemetry tracing disabled. This allows methods
43+
* that use this method to be unit tested.
44+
*/
45+
public PubsubMessageWrapper getMessageWrapper() {
46+
if (this.messageWrapper == null) {
47+
return PubsubMessageWrapper.newBuilder(null, null).build();
48+
}
49+
return messageWrapper;
50+
}
51+
3952
public AckRequestData setResponse(AckResponse ackResponse, boolean setResponseOnSuccess) {
4053
if (this.messageFuture.isPresent() && !this.messageFuture.get().isDone()) {
4154
switch (ackResponse) {
@@ -68,6 +81,7 @@ public static Builder newBuilder(String ackId) {
6881
protected static final class Builder {
6982
private final String ackId;
7083
private Optional<SettableApiFuture<AckResponse>> messageFuture = Optional.empty();
84+
private PubsubMessageWrapper messageWrapper;
7185

7286
protected Builder(String ackId) {
7387
this.ackId = ackId;
@@ -78,6 +92,11 @@ public Builder setMessageFuture(SettableApiFuture<AckResponse> messageFuture) {
7892
return this;
7993
}
8094

95+
public Builder setMessageWrapper(PubsubMessageWrapper messageWrapper) {
96+
this.messageWrapper = messageWrapper;
97+
return this;
98+
}
99+
81100
public AckRequestData build() {
82101
return new AckRequestData(this);
83102
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 76 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ class MessageDispatcher {
104104
// To keep track of number of seconds the receiver takes to process messages.
105105
private final Distribution ackLatencyDistribution;
106106

107+
private final String subscriptionName;
108+
private final boolean enableOpenTelemetryTracing;
109+
private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);
110+
107111
/** Internal representation of a reply to a Pubsub message, to be sent back to the service. */
108112
public enum AckReply {
109113
ACK,
@@ -157,6 +161,7 @@ public void onFailure(Throwable t) {
157161
t);
158162
this.ackRequestData.setResponse(AckResponse.OTHER, false);
159163
pendingNacks.add(this.ackRequestData);
164+
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack");
160165
forget();
161166
}
162167

@@ -169,9 +174,11 @@ public void onSuccess(AckReply reply) {
169174
ackLatencyDistribution.record(
170175
Ints.saturatedCast(
171176
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
177+
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "ack");
172178
break;
173179
case NACK:
174180
pendingNacks.add(this.ackRequestData);
181+
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack");
175182
break;
176183
default:
177184
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
@@ -217,6 +224,12 @@ private MessageDispatcher(Builder builder) {
217224
jobLock = new ReentrantLock();
218225
messagesWaiter = new Waiter();
219226
sequentialExecutor = new SequentialExecutorService.AutoExecutor(builder.executor);
227+
228+
subscriptionName = builder.subscriptionName;
229+
enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
230+
if (builder.tracer != null) {
231+
tracer = builder.tracer;
232+
}
220233
}
221234

222235
private boolean shouldSetMessageFuture() {
@@ -351,13 +364,15 @@ void setMessageOrderingEnabled(boolean messageOrderingEnabled) {
351364
}
352365

353366
private static class OutstandingMessage {
354-
private final ReceivedMessage receivedMessage;
355367
private final AckHandler ackHandler;
356368

357-
private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
358-
this.receivedMessage = receivedMessage;
369+
private OutstandingMessage(AckHandler ackHandler) {
359370
this.ackHandler = ackHandler;
360371
}
372+
373+
public PubsubMessageWrapper messageWrapper() {
374+
return this.ackHandler.ackRequestData.getMessageWrapper();
375+
}
361376
}
362377

363378
private static class ReceiptCompleteData {
@@ -390,10 +405,20 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
390405
if (shouldSetMessageFuture()) {
391406
builder.setMessageFuture(SettableApiFuture.create());
392407
}
408+
PubsubMessageWrapper messageWrapper =
409+
PubsubMessageWrapper.newBuilder(
410+
message.getMessage(),
411+
subscriptionName,
412+
message.getAckId(),
413+
message.getDeliveryAttempt())
414+
.build();
415+
builder.setMessageWrapper(messageWrapper);
416+
tracer.startSubscriberSpan(messageWrapper, this.exactlyOnceDeliveryEnabled.get());
417+
393418
AckRequestData ackRequestData = builder.build();
394419
AckHandler ackHandler =
395420
new AckHandler(ackRequestData, message.getMessage().getSerializedSize(), totalExpiration);
396-
OutstandingMessage outstandingMessage = new OutstandingMessage(message, ackHandler);
421+
OutstandingMessage outstandingMessage = new OutstandingMessage(ackHandler);
397422

398423
if (this.exactlyOnceDeliveryEnabled.get()) {
399424
// For exactly once deliveries we don't add to outstanding batch because we first
@@ -457,30 +482,40 @@ private void processBatch(List<OutstandingMessage> batch) {
457482
for (OutstandingMessage message : batch) {
458483
// This is a blocking flow controller. We have already incremented messagesWaiter, so
459484
// shutdown will block on processing of all these messages anyway.
485+
tracer.startSubscribeConcurrencyControlSpan(message.messageWrapper());
460486
try {
461-
flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize());
487+
flowController.reserve(1, message.messageWrapper().getPubsubMessage().getSerializedSize());
488+
tracer.endSubscribeConcurrencyControlSpan(message.messageWrapper());
462489
} catch (FlowControlException unexpectedException) {
463490
// This should be a blocking flow controller and never throw an exception.
491+
tracer.setSubscribeConcurrencyControlSpanException(
492+
message.messageWrapper(), unexpectedException);
464493
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
465494
}
466-
processOutstandingMessage(addDeliveryInfoCount(message.receivedMessage), message.ackHandler);
495+
addDeliveryInfoCount(message.messageWrapper());
496+
processOutstandingMessage(message.ackHandler);
467497
}
468498
}
469499

470-
private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) {
471-
PubsubMessage originalMessage = receivedMessage.getMessage();
472-
int deliveryAttempt = receivedMessage.getDeliveryAttempt();
500+
private void addDeliveryInfoCount(PubsubMessageWrapper messageWrapper) {
501+
PubsubMessage originalMessage = messageWrapper.getPubsubMessage();
502+
int deliveryAttempt = messageWrapper.getDeliveryAttempt();
473503
// Delivery Attempt will be set to 0 if DeadLetterPolicy is not set on the subscription. In
474504
// this case, do not populate the PubsubMessage with the delivery attempt attribute.
475505
if (deliveryAttempt > 0) {
476-
return PubsubMessage.newBuilder(originalMessage)
477-
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
478-
.build();
506+
messageWrapper.setPubsubMessage(
507+
PubsubMessage.newBuilder(originalMessage)
508+
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
509+
.build());
479510
}
480-
return originalMessage;
481511
}
482512

483-
private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
513+
private void processOutstandingMessage(final AckHandler ackHandler) {
514+
// Get the PubsubMessageWrapper and the PubsubMessage it wraps that are stored withing the
515+
// AckHandler object.
516+
PubsubMessageWrapper messageWrapper = ackHandler.ackRequestData.getMessageWrapper();
517+
PubsubMessage message = messageWrapper.getPubsubMessage();
518+
484519
// This future is for internal bookkeeping to be sent to the StreamingSubscriberConnection
485520
// use below in the consumers
486521
SettableApiFuture<AckReply> ackReplySettableApiFuture = SettableApiFuture.create();
@@ -499,8 +534,10 @@ public void run() {
499534
// so it was probably sent to someone else. Don't work on it.
500535
// Don't nack it either, because we'd be nacking someone else's message.
501536
ackHandler.forget();
537+
tracer.setSubscriberSpanExpirationResult(messageWrapper);
502538
return;
503539
}
540+
tracer.startSubscribeProcessSpan(messageWrapper);
504541
if (shouldSetMessageFuture()) {
505542
// This is the message future that is propagated to the user
506543
SettableApiFuture<AckResponse> messageFuture =
@@ -521,7 +558,9 @@ public void run() {
521558
if (!messageOrderingEnabled.get() || message.getOrderingKey().isEmpty()) {
522559
executor.execute(deliverMessageTask);
523560
} else {
561+
tracer.startSubscribeSchedulerSpan(messageWrapper);
524562
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
563+
tracer.endSubscribeSchedulerSpan(messageWrapper);
525564
}
526565
}
527566

@@ -607,8 +646,10 @@ void processOutstandingOperations() {
607646
List<AckRequestData> ackRequestDataReceipts = new ArrayList<AckRequestData>();
608647
pendingReceipts.drainTo(ackRequestDataReceipts);
609648
if (!ackRequestDataReceipts.isEmpty()) {
610-
modackRequestData.add(
611-
new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts));
649+
ModackRequestData receiptModack =
650+
new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts);
651+
receiptModack.setIsReceiptModack(true);
652+
modackRequestData.add(receiptModack);
612653
}
613654
logger.log(Level.FINER, "Sending {0} receipts", ackRequestDataReceipts.size());
614655

@@ -645,6 +686,10 @@ public static final class Builder {
645686
private ScheduledExecutorService systemExecutor;
646687
private ApiClock clock;
647688

689+
private String subscriptionName;
690+
private boolean enableOpenTelemetryTracing;
691+
private OpenTelemetryPubsubTracer tracer;
692+
648693
protected Builder(MessageReceiver receiver) {
649694
this.receiver = receiver;
650695
}
@@ -715,6 +760,21 @@ public Builder setApiClock(ApiClock clock) {
715760
return this;
716761
}
717762

763+
public Builder setSubscriptionName(String subscriptionName) {
764+
this.subscriptionName = subscriptionName;
765+
return this;
766+
}
767+
768+
public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
769+
this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
770+
return this;
771+
}
772+
773+
public Builder setTracer(OpenTelemetryPubsubTracer tracer) {
774+
this.tracer = tracer;
775+
return this;
776+
}
777+
718778
public MessageDispatcher build() {
719779
return new MessageDispatcher(this);
720780
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
class ModackRequestData {
2222
private final int deadlineExtensionSeconds;
2323
private List<AckRequestData> ackRequestData;
24+
private boolean isReceiptModack;
2425

2526
ModackRequestData(int deadlineExtensionSeconds) {
2627
this.deadlineExtensionSeconds = deadlineExtensionSeconds;
@@ -45,8 +46,17 @@ public List<AckRequestData> getAckRequestData() {
4546
return ackRequestData;
4647
}
4748

49+
public boolean getIsReceiptModack() {
50+
return isReceiptModack;
51+
}
52+
4853
public ModackRequestData addAckRequestData(AckRequestData ackRequestData) {
4954
this.ackRequestData.add(ackRequestData);
5055
return this;
5156
}
57+
58+
public ModackRequestData setIsReceiptModack(boolean isReceiptModack) {
59+
this.isReceiptModack = isReceiptModack;
60+
return this;
61+
}
5262
}

0 commit comments

Comments
 (0)