Skip to content

Commit 738a6ca

Browse files
committed
GH-3989 Consider the custom name of the reply topic name in sendAndReceive
Signed-off-by: mipo256 <[email protected]>
1 parent 8126e4b commit 738a6ca

File tree

4 files changed

+124
-7
lines changed

4 files changed

+124
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/requestreply/CorrelationKey.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,16 @@ public final class CorrelationKey {
3535

3636
private final byte[] correlationId;
3737

38+
/**
39+
* Cached hex representation of the {@link #correlationId}.
40+
* TODO: Migrate to stable values JEP 502
41+
*/
3842
private @Nullable String asString;
3943

44+
/**
45+
* Cached hash code.
46+
* TODO: Migrate to stable values JEP 502
47+
*/
4048
private volatile @Nullable Integer hashCode;
4149

4250
public CorrelationKey(byte[] correlationId) { // NOSONAR array reference

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.time.Instant;
2323
import java.util.Collection;
2424
import java.util.List;
25+
import java.util.Optional;
2526
import java.util.UUID;
2627
import java.util.concurrent.ConcurrentHashMap;
2728
import java.util.concurrent.ConcurrentMap;
@@ -73,6 +74,7 @@
7374
* @author Artem Bilan
7475
* @author Borahm Lee
7576
* @author Francois Rosiere
77+
* @author Mikhail Polivakha
7678
*
7779
* @since 2.1.3
7880
*
@@ -422,12 +424,13 @@ public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, @
422424
CorrelationKey correlationId = this.correlationStrategy.apply(record);
423425
Assert.notNull(correlationId, "the created 'correlationId' cannot be null");
424426
Headers headers = record.headers();
425-
boolean hasReplyTopic = headers.lastHeader(KafkaHeaders.REPLY_TOPIC) != null;
427+
boolean hasReplyTopic = headers.lastHeader(this.replyTopicHeaderName) != null;
426428
if (!hasReplyTopic && this.replyTopic != null) {
427429
headers.add(new RecordHeader(this.replyTopicHeaderName, this.replyTopic));
428-
if (this.replyPartition != null) {
429-
headers.add(new RecordHeader(this.replyPartitionHeaderName, this.replyPartition));
430-
}
430+
}
431+
boolean hasReplyPartition = headers.lastHeader(this.replyPartitionHeaderName) != null;
432+
if (hasReplyPartition && this.replyPartition != null) {
433+
headers.add(new RecordHeader(this.replyPartitionHeaderName, this.replyPartition));
431434
}
432435
Object correlation = this.binaryCorrelation ? correlationId : correlationId.toString();
433436
byte[] correlationValue = this.binaryCorrelation

spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.List;
2727
import java.util.Locale;
2828
import java.util.Map;
29+
import java.util.Optional;
2930
import java.util.Set;
3031
import java.util.stream.Collectors;
3132

@@ -49,7 +50,6 @@
4950
* @author Soby Chacko
5051
*
5152
* @since 2.1.3
52-
*
5353
*/
5454
public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper {
5555

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,14 @@
102102
* @author Gary Russell
103103
* @author Nathan Xu
104104
* @author Soby Chacko
105+
* @author Mikhail Polivakha
105106
* @since 2.1.3
106107
*
107108
*/
108109
@SpringJUnitConfig
109110
@DirtiesContext
110-
@EmbeddedKafka(partitions = 5, topics = { ReplyingKafkaTemplateTests.A_REPLY, ReplyingKafkaTemplateTests.A_REQUEST,
111+
@EmbeddedKafka(partitions = 5, topics = {
112+
ReplyingKafkaTemplateTests.A_REPLY, ReplyingKafkaTemplateTests.A_REQUEST,
111113
ReplyingKafkaTemplateTests.B_REPLY, ReplyingKafkaTemplateTests.B_REQUEST,
112114
ReplyingKafkaTemplateTests.C_REPLY, ReplyingKafkaTemplateTests.C_REQUEST,
113115
ReplyingKafkaTemplateTests.D_REPLY, ReplyingKafkaTemplateTests.D_REQUEST,
@@ -119,7 +121,10 @@
119121
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST,
120122
ReplyingKafkaTemplateTests.K_REPLY, ReplyingKafkaTemplateTests.K_REQUEST,
121123
ReplyingKafkaTemplateTests.L_REPLY, ReplyingKafkaTemplateTests.L_REQUEST,
122-
ReplyingKafkaTemplateTests.M_REPLY, ReplyingKafkaTemplateTests.M_REQUEST })
124+
ReplyingKafkaTemplateTests.M_REPLY, ReplyingKafkaTemplateTests.M_REQUEST,
125+
ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_REPLY, ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_REQUEST,
126+
ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY, ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST
127+
})
123128
public class ReplyingKafkaTemplateTests {
124129

125130
public static final String A_REPLY = "aReply";
@@ -174,6 +179,14 @@ public class ReplyingKafkaTemplateTests {
174179

175180
public static final String M_REQUEST = "mRequest";
176181

182+
public static final String CUSTOM_REPLY_HEADER_REPLY = "CUSTOM_REPLY_HEADER_REPLY";
183+
184+
public static final String CUSTOM_REPLY_HEADER_REQUEST = "CUSTOM_REPLY_HEADER_REQUEST";
185+
186+
public static final String CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY = "CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY";
187+
188+
public static final String CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST = "CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST";
189+
177190
@Autowired
178191
private EmbeddedKafkaBroker embeddedKafka;
179192

@@ -365,6 +378,54 @@ public void testHandlerReturn() throws Exception {
365378
}
366379
}
367380

381+
@Test
382+
public void testCustomReplyTopicHeaderIsNotDuplicated() throws Exception {
383+
String customReplyHeaderName = "X-Custom-Reply-Header";
384+
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(CUSTOM_REPLY_HEADER_REPLY);
385+
template.setReplyTopicHeaderName(customReplyHeaderName);
386+
try {
387+
Message<String> message = MessageBuilder.withPayload("expected_message")
388+
.setHeader(customReplyHeaderName, CUSTOM_REPLY_HEADER_REPLY)
389+
.setHeader(KafkaHeaders.TOPIC, CUSTOM_REPLY_HEADER_REQUEST)
390+
.build();
391+
392+
RequestReplyMessageFuture<Integer, String> future = template.sendAndReceive(message, Duration.ofSeconds(30));
393+
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
394+
Message<?> resultingMessage = future.get(30, TimeUnit.SECONDS);
395+
assertThat(resultingMessage.getPayload()).isEqualTo("OK");
396+
}
397+
finally {
398+
template.stop();
399+
template.destroy();
400+
}
401+
}
402+
403+
@Test
404+
public void testCustomReplyHeadersAreNotDuplicated() throws Exception {
405+
String customReplyTopicHeaderName = "X-Custom-Reply-Header";
406+
String customReplyPartitionHeaderName = "X-Custom-Reply-Partition";
407+
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY);
408+
template.setReplyTopicHeaderName(customReplyTopicHeaderName);
409+
template.setReplyPartitionHeaderName(customReplyPartitionHeaderName);
410+
411+
try {
412+
Message<String> message = MessageBuilder.withPayload("expected_message")
413+
.setHeader(customReplyTopicHeaderName, CUSTOM_REPLY_HEADER_REPLY)
414+
.setHeader(customReplyPartitionHeaderName, "test-partition")
415+
.setHeader(KafkaHeaders.TOPIC, CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST)
416+
.build();
417+
418+
RequestReplyMessageFuture<Integer, String> future = template.sendAndReceive(message, Duration.ofSeconds(30));
419+
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
420+
Message<?> resultingMessage = future.get(30, TimeUnit.SECONDS);
421+
assertThat(resultingMessage.getPayload()).isEqualTo("OK");
422+
}
423+
finally {
424+
template.stop();
425+
template.destroy();
426+
}
427+
}
428+
368429
@Test
369430
public void testMessageReturnNoHeadersProvidedByListener() throws Exception {
370431
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(H_REPLY);
@@ -1046,6 +1107,45 @@ public List<Message<String>> handleM(String in) throws InterruptedException {
10461107
return Collections.singletonList(message);
10471108
}
10481109

1110+
@KafkaListener(id = CUSTOM_REPLY_HEADER_REQUEST, topics = CUSTOM_REPLY_HEADER_REQUEST)
1111+
@SendTo(CUSTOM_REPLY_HEADER_REPLY) // send to custom topic back
1112+
public String handleCustomReplyHeaderNoReplyPartition(ConsumerRecord<?, String> inputMessage) {
1113+
Headers headers = inputMessage.headers();
1114+
1115+
if (length(headers.headers("X-Custom-Reply-Header")) != 1) {
1116+
return "The X-Custom-Reply-Header header that signify the custom reply topic header name is duplicated. It is supposed to present only once";
1117+
}
1118+
1119+
if (length(headers.headers(KafkaHeaders.REPLY_PARTITION)) != 0) {
1120+
return "It is expected that the user does NOT specify the reply partition in this test case";
1121+
}
1122+
1123+
if (!"expected_message".equals(inputMessage.value())) {
1124+
return "Expected message is 'expected_message', but got %s".formatted(inputMessage.value());
1125+
}
1126+
1127+
return "OK";
1128+
}
1129+
1130+
@KafkaListener(id = CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST, topics = CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST)
1131+
@SendTo(CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY) // send to custom topic back
1132+
public String handleCustomReplyHeaderDefaultPartitionHeader(ConsumerRecord<?, String> inputMessage) {
1133+
Headers headers = inputMessage.headers();
1134+
1135+
if (length(headers.headers("X-Custom-Reply-Header")) != 1) {
1136+
return "The X-Custom-Reply-Header header that signify the custom reply topic header name is duplicated. It is supposed to present only once";
1137+
}
1138+
1139+
if (length(headers.headers("X-Custom-Reply-Partition")) != 1) {
1140+
return "Executed a single reply partition header '%s' in the incoming message".formatted(KafkaHeaders.REPLY_PARTITION);
1141+
}
1142+
1143+
if (!"expected_message".equals(inputMessage.value())) {
1144+
return "Expected message is 'expected_message', but got %s".formatted(inputMessage.value());
1145+
}
1146+
1147+
return "OK";
1148+
}
10491149
}
10501150

10511151
@KafkaListener(topics = C_REQUEST, groupId = C_REQUEST)
@@ -1090,6 +1190,12 @@ public Object deserialize(String topic, Headers headers, byte[] data) {
10901190

10911191
}
10921192

1193+
private static int length(Iterable<?> iterable) {
1194+
int counter = 0;
1195+
for (; iterable.iterator().hasNext(); counter++) {}
1196+
return counter;
1197+
}
1198+
10931199
public static class Foo {
10941200

10951201
private String bar;

0 commit comments

Comments
 (0)