X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplicationTests.java;h=3fa5851f061bad65b88c5e38e9f591fde26d8ab0;hb=520528638e7487c845d7fb0f39066ce7d249297b;hp=9b1960f7284a1b9fbc38ca5bfb273a1db3f50e1d;hpb=f132033a6793fc566962a4361d7d0a0e852b0f82;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationTests.java index 9b1960f..3fa5851 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationTests.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationTests.java @@ -12,10 +12,10 @@ import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; import java.time.Duration; -import java.util.LinkedList; -import java.util.List; import static de.juplo.kafka.wordcount.splitter.SplitterApplicationTests.*; import static org.assertj.core.api.Assertions.assertThat; @@ -57,31 +57,48 @@ public class SplitterApplicationTests Recording recording = new Recording(); recording.setUser("peter"); recording.setSentence("Hallo Welt!"); - kafkaTemplate.send(TOPIC_IN, mapper.writeValueAsString(recording)); - - String word1 = mapper.writeValueAsString(Word.of("peter", "Hallo")); - String word2 = mapper.writeValueAsString(Word.of("peter", "Welt")); + kafkaTemplate.send(TOPIC_IN, recording.getUser(), mapper.writeValueAsString(recording)); + recording.setUser("klaus"); + recording.setSentence("Müsch gäb's auch!"); + kafkaTemplate.send(TOPIC_IN, recording.getUser(), mapper.writeValueAsString(recording)); + recording.setUser("peter"); + recording.setSentence("Boäh, echt! ß mal nä Nümmäh!"); + kafkaTemplate.send(TOPIC_IN, recording.getUser(), mapper.writeValueAsString(recording)); + + String peter1 = mapper.writeValueAsString(Word.of("peter", "Hallo")); + String peter2 = mapper.writeValueAsString(Word.of("peter", "Welt")); + String peter3 = mapper.writeValueAsString(Word.of("peter", "Boäh")); + String peter4 = mapper.writeValueAsString(Word.of("peter", "echt")); + String peter5 = mapper.writeValueAsString(Word.of("peter", "ß")); + String peter6 = mapper.writeValueAsString(Word.of("peter", "mal")); + String peter7 = mapper.writeValueAsString(Word.of("peter", "nä")); + String peter8 = mapper.writeValueAsString(Word.of("peter", "Nümmäh")); + + String klaus1 = mapper.writeValueAsString(Word.of("klaus","Müsch")); + String klaus2 = mapper.writeValueAsString(Word.of("klaus","gäb")); + String klaus3 = mapper.writeValueAsString(Word.of("klaus","s")); + String klaus4 = mapper.writeValueAsString(Word.of("klaus","auch")); await("Expexted converted data") .atMost(Duration.ofSeconds(5)) .untilAsserted(() -> { assertThat(consumer.received).hasSize(2); - assertThat(consumer.received.get(0).value()).isEqualTo(word1); - assertThat(consumer.received.get(1).value()).isEqualTo(word2); + assertThat(consumer.received.get("klaus")).containsExactly(klaus1, klaus2, klaus3, klaus4); + assertThat(consumer.received.get("peter")).containsExactly(peter1, peter2, peter3, peter4, peter5, peter6, peter7, peter8); }); } static class Consumer { - final List> received = new LinkedList<>(); + final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public void receive(ConsumerRecord record) { log.debug("Received message: {}", record); - received.add(record); + received.add(record.key(), record.value()); } }