|
33 | 33 | import java.util.concurrent.atomic.AtomicInteger;
|
34 | 34 | import java.util.concurrent.atomic.AtomicReference;
|
35 | 35 |
|
| 36 | +import com.google.common.collect.Iterables; |
36 | 37 | import org.apache.kafka.clients.consumer.Consumer;
|
37 | 38 | import org.apache.kafka.clients.consumer.ConsumerConfig;
|
38 | 39 | import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
|
|
102 | 103 | * @author Gary Russell
|
103 | 104 | * @author Nathan Xu
|
104 | 105 | * @author Soby Chacko
|
| 106 | + * @author Mikhail Polivakha |
105 | 107 | * @since 2.1.3
|
106 | 108 | *
|
107 | 109 | */
|
108 | 110 | @SpringJUnitConfig
|
109 | 111 | @DirtiesContext
|
110 |
| -@EmbeddedKafka(partitions = 5, topics = { ReplyingKafkaTemplateTests.A_REPLY, ReplyingKafkaTemplateTests.A_REQUEST, |
| 112 | +@EmbeddedKafka(partitions = 5, topics = { |
| 113 | + ReplyingKafkaTemplateTests.A_REPLY, ReplyingKafkaTemplateTests.A_REQUEST, |
111 | 114 | ReplyingKafkaTemplateTests.B_REPLY, ReplyingKafkaTemplateTests.B_REQUEST,
|
112 | 115 | ReplyingKafkaTemplateTests.C_REPLY, ReplyingKafkaTemplateTests.C_REQUEST,
|
113 | 116 | ReplyingKafkaTemplateTests.D_REPLY, ReplyingKafkaTemplateTests.D_REQUEST,
|
|
119 | 122 | ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST,
|
120 | 123 | ReplyingKafkaTemplateTests.K_REPLY, ReplyingKafkaTemplateTests.K_REQUEST,
|
121 | 124 | ReplyingKafkaTemplateTests.L_REPLY, ReplyingKafkaTemplateTests.L_REQUEST,
|
122 |
| - ReplyingKafkaTemplateTests.M_REPLY, ReplyingKafkaTemplateTests.M_REQUEST }) |
| 125 | + ReplyingKafkaTemplateTests.M_REPLY, ReplyingKafkaTemplateTests.M_REQUEST, |
| 126 | + ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_REPLY, ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_REQUEST, |
| 127 | + ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY, ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST |
| 128 | +}) |
123 | 129 | public class ReplyingKafkaTemplateTests {
|
124 | 130 |
|
125 | 131 | public static final String A_REPLY = "aReply";
|
@@ -174,6 +180,18 @@ public class ReplyingKafkaTemplateTests {
|
174 | 180 |
|
175 | 181 | public static final String M_REQUEST = "mRequest";
|
176 | 182 |
|
| 183 | + // GH-3989 |
| 184 | + public static final String CUSTOM_REPLY_HEADER_REPLY = "CUSTOM_REPLY_HEADER_REPLY"; |
| 185 | + |
| 186 | + // GH-3989 |
| 187 | + public static final String CUSTOM_REPLY_HEADER_REQUEST = "CUSTOM_REPLY_HEADER_REQUEST"; |
| 188 | + |
| 189 | + // GH-3989 |
| 190 | + public static final String CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY = "CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY"; |
| 191 | + |
| 192 | + // GH-3989 |
| 193 | + public static final String CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST = "CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST"; |
| 194 | + |
177 | 195 | @Autowired
|
178 | 196 | private EmbeddedKafkaBroker embeddedKafka;
|
179 | 197 |
|
@@ -365,6 +383,54 @@ public void testHandlerReturn() throws Exception {
|
365 | 383 | }
|
366 | 384 | }
|
367 | 385 |
|
| 386 | + @Test |
| 387 | + public void testCustomReplyTopicHeaderIsNotDuplicated() throws Exception { |
| 388 | + String customReplyHeaderName = "X-Custom-Reply-Header"; |
| 389 | + ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(CUSTOM_REPLY_HEADER_REPLY); |
| 390 | + template.setReplyTopicHeaderName(customReplyHeaderName); |
| 391 | + try { |
| 392 | + Message<String> message = MessageBuilder.withPayload("foo") |
| 393 | + .setHeader(customReplyHeaderName, CUSTOM_REPLY_HEADER_REPLY) |
| 394 | + .setHeader(KafkaHeaders.TOPIC, CUSTOM_REPLY_HEADER_REQUEST) |
| 395 | + .build(); |
| 396 | + |
| 397 | + RequestReplyMessageFuture<Integer, String> future = template.sendAndReceive(message, Duration.ofSeconds(30)); |
| 398 | + future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok |
| 399 | + Message<?> resultingMessage = future.get(30, TimeUnit.SECONDS); |
| 400 | + assertThat(resultingMessage.getPayload()).isEqualTo("OK"); |
| 401 | + } |
| 402 | + finally { |
| 403 | + template.stop(); |
| 404 | + template.destroy(); |
| 405 | + } |
| 406 | + } |
| 407 | + |
| 408 | + @Test |
| 409 | + public void testCustomReplyHeadersAreNotDuplicated() throws Exception { |
| 410 | + String customReplyTopicHeaderName = "X-Custom-Reply-Header"; |
| 411 | + String customReplyPartitionHeaderName = "X-Custom-Reply-Partition"; |
| 412 | + ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY); |
| 413 | + template.setReplyTopicHeaderName(customReplyTopicHeaderName); |
| 414 | + template.setReplyPartitionHeaderName(customReplyPartitionHeaderName); |
| 415 | + |
| 416 | + try { |
| 417 | + Message<String> message = MessageBuilder.withPayload("foo") |
| 418 | + .setHeader(customReplyTopicHeaderName, CUSTOM_REPLY_HEADER_REPLY) |
| 419 | + .setHeader(customReplyPartitionHeaderName, "test-partition") |
| 420 | + .setHeader(KafkaHeaders.TOPIC, CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST) |
| 421 | + .build(); |
| 422 | + |
| 423 | + RequestReplyMessageFuture<Integer, String> future = template.sendAndReceive(message, Duration.ofSeconds(30)); |
| 424 | + future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok |
| 425 | + Message<?> resultingMessage = future.get(30, TimeUnit.SECONDS); |
| 426 | + assertThat(resultingMessage.getPayload()).isEqualTo("OK"); |
| 427 | + } |
| 428 | + finally { |
| 429 | + template.stop(); |
| 430 | + template.destroy(); |
| 431 | + } |
| 432 | + } |
| 433 | + |
368 | 434 | @Test
|
369 | 435 | public void testMessageReturnNoHeadersProvidedByListener() throws Exception {
|
370 | 436 | ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(H_REPLY);
|
@@ -1046,6 +1112,47 @@ public List<Message<String>> handleM(String in) throws InterruptedException {
|
1046 | 1112 | return Collections.singletonList(message);
|
1047 | 1113 | }
|
1048 | 1114 |
|
| 1115 | + // GH-3989 |
| 1116 | + @KafkaListener(id = CUSTOM_REPLY_HEADER_REQUEST, topics = CUSTOM_REPLY_HEADER_REQUEST) |
| 1117 | + @SendTo(CUSTOM_REPLY_HEADER_REPLY) // send to custom topic back |
| 1118 | + public String handleCustomReplyHeaderNoReplyPartition(ConsumerRecord<?, String> inputMessage) { |
| 1119 | + Headers headers = inputMessage.headers(); |
| 1120 | + |
| 1121 | + if (Iterables.size(headers.headers("X-Custom-Reply-Header")) != 1) { |
| 1122 | + return "The X-Custom-Reply-Header header that signify the custom reply topic header name is duplicated. It is supposed to present only once"; |
| 1123 | + } |
| 1124 | + |
| 1125 | + if (Iterables.size(headers.headers(KafkaHeaders.REPLY_PARTITION)) != 0) { |
| 1126 | + return "It is expected that the user does NOT specify the reply partition in this test case"; |
| 1127 | + } |
| 1128 | + |
| 1129 | + if (!"foo".equals(inputMessage.value())) { |
| 1130 | + return "Expected message is 'foo', but got %s".formatted(inputMessage.value()); |
| 1131 | + } |
| 1132 | + |
| 1133 | + return "OK"; |
| 1134 | + } |
| 1135 | + |
| 1136 | + // GH-3989 |
| 1137 | + @KafkaListener(id = CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST, topics = CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST) |
| 1138 | + @SendTo(CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY) // send to custom topic back |
| 1139 | + public String handleCustomReplyHeaderDefaultPartitionHeader(ConsumerRecord<?, String> inputMessage) { |
| 1140 | + Headers headers = inputMessage.headers(); |
| 1141 | + |
| 1142 | + if (Iterables.size(headers.headers("X-Custom-Reply-Header")) != 1) { |
| 1143 | + return "The X-Custom-Reply-Header header that signify the custom reply topic header name is duplicated. It is supposed to present only once"; |
| 1144 | + } |
| 1145 | + |
| 1146 | + if (Iterables.size(headers.headers("X-Custom-Reply-Partition")) != 1) { |
| 1147 | + return "Executed a single reply partition header '%s' in the incoming message".formatted(KafkaHeaders.REPLY_PARTITION); |
| 1148 | + } |
| 1149 | + |
| 1150 | + if (!"foo".equals(inputMessage.value())) { |
| 1151 | + return "Expected message is 'foo', but got %s".formatted(inputMessage.value()); |
| 1152 | + } |
| 1153 | + |
| 1154 | + return "OK"; |
| 1155 | + } |
1049 | 1156 | }
|
1050 | 1157 |
|
1051 | 1158 | @KafkaListener(topics = C_REQUEST, groupId = C_REQUEST)
|
|
0 commit comments