+
+ @Test
+ void split()
+ {
+ kafkaTemplate.send(TOPIC_IN, "beate", "Hello World!");
+
+ Awaitility
+ .await("Receive two words")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() ->
+ assertThat(listener.words.size())
+ .describedAs("Received two words")
+ .isEqualTo(2));
+
+ assertThat(listener.keys)
+ .describedAs("Received unexpected keys")
+ .containsExactly("beate", "beate");
+ assertThat(listener.words)
+ .describedAs("Received unexpected words")
+ .containsExactly("Hello", "World");
+ }
+
+
+ static class Listener
+ {
+ final List<String> keys = new LinkedList<>();
+ final List<String> words = new LinkedList<>();
+
+ @KafkaListener(groupId = "peter", topics = TOPIC_OUT)
+ public void receive(ConsumerRecord<String, String> record)
+ {
+ keys.add(record.key());
+ words.add(record.value());
+ }
+ }
+
+ @TestConfiguration
+ static class Configuration
+ {
+ @Bean
+ Listener listener()
+ {
+ return new Listener();
+ }
+ }