From: Kai Moritz Date: Sun, 5 Feb 2023 10:47:56 +0000 (+0100) Subject: splitter: 1.1.3 - Refined test & fixed a bug in the splitting-pattern X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fwordcount;a=commitdiff_plain;h=520528638e7487c845d7fb0f39066ce7d249297b splitter: 1.1.3 - Refined test & fixed a bug in the splitting-pattern - The tests now sends messages for two users and asserts, that the messages are handled accordingly for each user. - The test now send messages with german umlauts and other special german characters. - Fixed a bug in the pattern, that is used for the spltting: language specific special characters were handled as non-word characters. --- diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java index 86d5bbd..66188a7 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -28,7 +28,7 @@ import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.St @Component public class SplitterStreamProcessor { - final static Pattern PATTERN = Pattern.compile("\\W+"); + final static Pattern PATTERN = Pattern.compile("[^\\p{IsAlphabetic}]+"); public final KafkaStreams streams; diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationTests.java index 9b1960f..3fa5851 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationTests.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationTests.java @@ -12,10 +12,10 @@ import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; import java.time.Duration; -import java.util.LinkedList; -import java.util.List; import static de.juplo.kafka.wordcount.splitter.SplitterApplicationTests.*; import static org.assertj.core.api.Assertions.assertThat; @@ -57,31 +57,48 @@ public class SplitterApplicationTests Recording recording = new Recording(); recording.setUser("peter"); recording.setSentence("Hallo Welt!"); - kafkaTemplate.send(TOPIC_IN, mapper.writeValueAsString(recording)); - - String word1 = mapper.writeValueAsString(Word.of("peter", "Hallo")); - String word2 = mapper.writeValueAsString(Word.of("peter", "Welt")); + kafkaTemplate.send(TOPIC_IN, recording.getUser(), mapper.writeValueAsString(recording)); + recording.setUser("klaus"); + recording.setSentence("Müsch gäb's auch!"); + kafkaTemplate.send(TOPIC_IN, recording.getUser(), mapper.writeValueAsString(recording)); + recording.setUser("peter"); + recording.setSentence("Boäh, echt! ß mal nä Nümmäh!"); + kafkaTemplate.send(TOPIC_IN, recording.getUser(), mapper.writeValueAsString(recording)); + + String peter1 = mapper.writeValueAsString(Word.of("peter", "Hallo")); + String peter2 = mapper.writeValueAsString(Word.of("peter", "Welt")); + String peter3 = mapper.writeValueAsString(Word.of("peter", "Boäh")); + String peter4 = mapper.writeValueAsString(Word.of("peter", "echt")); + String peter5 = mapper.writeValueAsString(Word.of("peter", "ß")); + String peter6 = mapper.writeValueAsString(Word.of("peter", "mal")); + String peter7 = mapper.writeValueAsString(Word.of("peter", "nä")); + String peter8 = mapper.writeValueAsString(Word.of("peter", "Nümmäh")); + + String klaus1 = mapper.writeValueAsString(Word.of("klaus","Müsch")); + String klaus2 = mapper.writeValueAsString(Word.of("klaus","gäb")); + String klaus3 = mapper.writeValueAsString(Word.of("klaus","s")); + String klaus4 = mapper.writeValueAsString(Word.of("klaus","auch")); await("Expexted converted data") .atMost(Duration.ofSeconds(5)) .untilAsserted(() -> { assertThat(consumer.received).hasSize(2); - assertThat(consumer.received.get(0).value()).isEqualTo(word1); - assertThat(consumer.received.get(1).value()).isEqualTo(word2); + assertThat(consumer.received.get("klaus")).containsExactly(klaus1, klaus2, klaus3, klaus4); + assertThat(consumer.received.get("peter")).containsExactly(peter1, peter2, peter3, peter4, peter5, peter6, peter7, peter8); }); } static class Consumer { - final List> received = new LinkedList<>(); + final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public void receive(ConsumerRecord record) { log.debug("Received message: {}", record); - received.add(record); + received.add(record.key(), record.value()); } }