Skip to content

Commit 8ead33a

Browse files
committed
GH-3989 Consider the custom name of the reply topic name in sendAndReceive
Signed-off-by: mipo256 <[email protected]>
1 parent 8126e4b commit 8ead33a

File tree

3 files changed

+124
-6
lines changed

3 files changed

+124
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/requestreply/CorrelationKey.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,16 @@ public final class CorrelationKey {
3535

3636
private final byte[] correlationId;
3737

38+
/**
39+
* Cached hex representation of the {@link #correlationId}.
40+
* TODO: Migrate to stable values JEP 502
41+
*/
3842
private @Nullable String asString;
3943

44+
/**
45+
* Cached hash code.
46+
* TODO: Migrate to stable values JEP 502
47+
*/
4048
private volatile @Nullable Integer hashCode;
4149

4250
public CorrelationKey(byte[] correlationId) { // NOSONAR array reference

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
* @author Artem Bilan
7474
* @author Borahm Lee
7575
* @author Francois Rosiere
76+
* @author Mikhail Polivakha
7677
*
7778
* @since 2.1.3
7879
*
@@ -422,12 +423,13 @@ public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, @
422423
CorrelationKey correlationId = this.correlationStrategy.apply(record);
423424
Assert.notNull(correlationId, "the created 'correlationId' cannot be null");
424425
Headers headers = record.headers();
425-
boolean hasReplyTopic = headers.lastHeader(KafkaHeaders.REPLY_TOPIC) != null;
426+
boolean hasReplyTopic = headers.lastHeader(this.replyTopicHeaderName) != null;
426427
if (!hasReplyTopic && this.replyTopic != null) {
427428
headers.add(new RecordHeader(this.replyTopicHeaderName, this.replyTopic));
428-
if (this.replyPartition != null) {
429-
headers.add(new RecordHeader(this.replyPartitionHeaderName, this.replyPartition));
430-
}
429+
}
430+
boolean hasReplyPartition = headers.lastHeader(this.replyPartitionHeaderName) != null;
431+
if (!hasReplyPartition && this.replyPartition != null) {
432+
headers.add(new RecordHeader(this.replyPartitionHeaderName, this.replyPartition));
431433
}
432434
Object correlation = this.binaryCorrelation ? correlationId : correlationId.toString();
433435
byte[] correlationValue = this.binaryCorrelation

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 110 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,14 @@
102102
* @author Gary Russell
103103
* @author Nathan Xu
104104
* @author Soby Chacko
105+
* @author Mikhail Polivakha
105106
* @since 2.1.3
106107
*
107108
*/
108109
@SpringJUnitConfig
109110
@DirtiesContext
110-
@EmbeddedKafka(partitions = 5, topics = { ReplyingKafkaTemplateTests.A_REPLY, ReplyingKafkaTemplateTests.A_REQUEST,
111+
@EmbeddedKafka(partitions = 5, topics = {
112+
ReplyingKafkaTemplateTests.A_REPLY, ReplyingKafkaTemplateTests.A_REQUEST,
111113
ReplyingKafkaTemplateTests.B_REPLY, ReplyingKafkaTemplateTests.B_REQUEST,
112114
ReplyingKafkaTemplateTests.C_REPLY, ReplyingKafkaTemplateTests.C_REQUEST,
113115
ReplyingKafkaTemplateTests.D_REPLY, ReplyingKafkaTemplateTests.D_REQUEST,
@@ -119,7 +121,10 @@
119121
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST,
120122
ReplyingKafkaTemplateTests.K_REPLY, ReplyingKafkaTemplateTests.K_REQUEST,
121123
ReplyingKafkaTemplateTests.L_REPLY, ReplyingKafkaTemplateTests.L_REQUEST,
122-
ReplyingKafkaTemplateTests.M_REPLY, ReplyingKafkaTemplateTests.M_REQUEST })
124+
ReplyingKafkaTemplateTests.M_REPLY, ReplyingKafkaTemplateTests.M_REQUEST,
125+
ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_REPLY, ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_REQUEST,
126+
ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY, ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST
127+
})
123128
public class ReplyingKafkaTemplateTests {
124129

125130
public static final String A_REPLY = "aReply";
@@ -174,6 +179,14 @@ public class ReplyingKafkaTemplateTests {
174179

175180
public static final String M_REQUEST = "mRequest";
176181

182+
public static final String CUSTOM_REPLY_HEADER_REPLY = "CUSTOM_REPLY_HEADER_REPLY";
183+
184+
public static final String CUSTOM_REPLY_HEADER_REQUEST = "CUSTOM_REPLY_HEADER_REQUEST";
185+
186+
public static final String CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY = "CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY";
187+
188+
public static final String CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST = "CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST";
189+
177190
@Autowired
178191
private EmbeddedKafkaBroker embeddedKafka;
179192

@@ -365,6 +378,54 @@ public void testHandlerReturn() throws Exception {
365378
}
366379
}
367380

381+
@Test
382+
public void testCustomReplyTopicHeaderIsNotDuplicated() throws Exception {
383+
String customReplyHeaderName = "X-Custom-Reply-Header";
384+
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(CUSTOM_REPLY_HEADER_REPLY);
385+
template.setReplyTopicHeaderName(customReplyHeaderName);
386+
try {
387+
Message<String> message = MessageBuilder.withPayload("expected_message")
388+
.setHeader(customReplyHeaderName, CUSTOM_REPLY_HEADER_REPLY)
389+
.setHeader(KafkaHeaders.TOPIC, CUSTOM_REPLY_HEADER_REQUEST)
390+
.build();
391+
392+
RequestReplyMessageFuture<Integer, String> future = template.sendAndReceive(message, Duration.ofSeconds(30));
393+
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
394+
Message<?> resultingMessage = future.get(30, TimeUnit.SECONDS);
395+
assertThat(resultingMessage.getPayload()).isEqualTo("OK");
396+
}
397+
finally {
398+
template.stop();
399+
template.destroy();
400+
}
401+
}
402+
403+
@Test
404+
public void testCustomReplyHeadersAreNotDuplicated() throws Exception {
405+
String customReplyTopicHeaderName = "X-Custom-Reply-Header";
406+
String customReplyPartitionHeaderName = "X-Custom-Reply-Partition";
407+
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY);
408+
template.setReplyTopicHeaderName(customReplyTopicHeaderName);
409+
template.setReplyPartitionHeaderName(customReplyPartitionHeaderName);
410+
411+
try {
412+
Message<String> message = MessageBuilder.withPayload("expected_message")
413+
.setHeader(customReplyTopicHeaderName, CUSTOM_REPLY_HEADER_REPLY)
414+
.setHeader(customReplyPartitionHeaderName, "test-partition")
415+
.setHeader(KafkaHeaders.TOPIC, CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST)
416+
.build();
417+
418+
RequestReplyMessageFuture<Integer, String> future = template.sendAndReceive(message, Duration.ofSeconds(30));
419+
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
420+
Message<?> resultingMessage = future.get(30, TimeUnit.SECONDS);
421+
assertThat(resultingMessage.getPayload()).isEqualTo("OK");
422+
}
423+
finally {
424+
template.stop();
425+
template.destroy();
426+
}
427+
}
428+
368429
@Test
369430
public void testMessageReturnNoHeadersProvidedByListener() throws Exception {
370431
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(H_REPLY);
@@ -871,6 +932,14 @@ void testMessageIterableReturn() throws Exception {
871932
}
872933
}
873934

935+
private static int length(Iterable<?> iterable) {
936+
int counter = 0;
937+
for (Object o : iterable) {
938+
counter++;
939+
}
940+
return counter;
941+
}
942+
874943
@Configuration
875944
@EnableKafka
876945
public static class Config {
@@ -1046,6 +1115,45 @@ public List<Message<String>> handleM(String in) throws InterruptedException {
10461115
return Collections.singletonList(message);
10471116
}
10481117

1118+
@KafkaListener(id = CUSTOM_REPLY_HEADER_REQUEST, topics = CUSTOM_REPLY_HEADER_REQUEST)
1119+
@SendTo(CUSTOM_REPLY_HEADER_REPLY) // send to custom topic back
1120+
public String handleCustomReplyHeaderNoReplyPartition(ConsumerRecord<?, String> inputMessage) {
1121+
Headers headers = inputMessage.headers();
1122+
1123+
if (length(headers.headers("X-Custom-Reply-Header")) != 1) {
1124+
return "The X-Custom-Reply-Header header that signify the custom reply topic header name is duplicated. It is supposed to present only once";
1125+
}
1126+
1127+
if (length(headers.headers(KafkaHeaders.REPLY_PARTITION)) != 0) {
1128+
return "It is expected that the user does NOT specify the reply partition in this test case";
1129+
}
1130+
1131+
if (!"expected_message".equals(inputMessage.value())) {
1132+
return "Expected message is 'expected_message', but got %s".formatted(inputMessage.value());
1133+
}
1134+
1135+
return "OK";
1136+
}
1137+
1138+
@KafkaListener(id = CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST, topics = CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST)
1139+
@SendTo(CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY) // send to custom topic back
1140+
public String handleCustomReplyHeaderDefaultPartitionHeader(ConsumerRecord<?, String> inputMessage) {
1141+
Headers headers = inputMessage.headers();
1142+
1143+
if (length(headers.headers("X-Custom-Reply-Header")) != 1) {
1144+
return "The X-Custom-Reply-Header header that signify the custom reply topic header name is duplicated. It is supposed to present only once";
1145+
}
1146+
1147+
if (length(headers.headers("X-Custom-Reply-Partition")) != 1) {
1148+
return "Executed a single reply partition header '%s' in the incoming message".formatted(KafkaHeaders.REPLY_PARTITION);
1149+
}
1150+
1151+
if (!"expected_message".equals(inputMessage.value())) {
1152+
return "Expected message is 'expected_message', but got %s".formatted(inputMessage.value());
1153+
}
1154+
1155+
return "OK";
1156+
}
10491157
}
10501158

10511159
@KafkaListener(topics = C_REQUEST, groupId = C_REQUEST)

0 commit comments

Comments
 (0)