From 6536eb76056262f388580f8762cdfbbf4275d5b2 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 11 Feb 2023 06:45:54 +0100 Subject: [PATCH] alt --- pom.xml | 20 ++++++++++++ .../counter/CounterStreamProcessor.java | 31 ++++++++++--------- .../de/juplo/kafka/wordcount/counter/Key.java | 2 +- .../juplo/kafka/wordcount/counter/Word.java | 13 ++++++++ .../kafka/wordcount/counter/WordCount.java | 12 +++++++ src/test/resources/logback-test.xml | 5 +++ 6 files changed, 67 insertions(+), 16 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/Word.java create mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java create mode 100644 src/test/resources/logback-test.xml diff --git a/pom.xml b/pom.xml index 1d2212c..6e1cf81 100644 --- a/pom.xml +++ b/pom.xml @@ -29,6 +29,10 @@ org.apache.kafka kafka-streams + + org.springframework.kafka + spring-kafka + org.springframework.boot @@ -46,11 +50,27 @@ lombok true + org.springframework.boot spring-boot-starter-test test + + org.springframework.kafka + spring-kafka-test + test + + + org.awaitility + awaitility + test + + + org.assertj + assertj-core + test + diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index d529541..ed54c95 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -6,6 +6,8 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.springframework.kafka.support.serializer.JsonSerde; import java.util.Properties; @@ -19,28 +21,27 @@ public class CounterStreamProcessor public CounterStreamProcessor( String inputTopic, String outputTopic, - Properties properties) + Properties properties, + ObjectMapper mapper) { StreamsBuilder builder = new StreamsBuilder(); - KStream source = builder.stream(properties.getInputTopic()); + KStream source = builder.stream(inputTopic); source - .map((username, word) -> - { - try - { - String key = mapper.writeValueAsString(Key.of(username, word)); - return new KeyValue<>(key, word); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }) + .map((key, word) -> new KeyValue<>(Key.of(word.getUser(), word.getWord()), word)) .groupByKey() .count() - .mapValues(value->Long.toString(value)) .toStream() + .map((key, count) -> new KeyValue<>(key, WordCount.of(key.getUser(), key.getWord(), count))) + .to( + outputTopic, + Produced.with( + new JsonSerde<>(Key.class) + .forKeys() + .noTypeInfo(), + new JsonSerde<>(WordCount.class) + .noTypeInfo())); + streams = new KafkaStreams(builder.build(), properties); } diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java b/src/main/java/de/juplo/kafka/wordcount/counter/Key.java index 1e00dca..137fcb2 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/Key.java @@ -6,6 +6,6 @@ import lombok.Value; @Value(staticConstructor = "of") public class Key { - private final String username; + private final String user; private final String word; } diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java new file mode 100644 index 0000000..77287d5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class Word +{ + private String user; + private String word; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java b/src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java new file mode 100644 index 0000000..44ccb2d --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java @@ -0,0 +1,12 @@ +package de.juplo.kafka.wordcount.counter; + +import lombok.Value; + + +@Value(staticConstructor = "of") +public class WordCount +{ + String user; + String word; + long count; +} diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml new file mode 100644 index 0000000..171bf63 --- /dev/null +++ b/src/test/resources/logback-test.xml @@ -0,0 +1,5 @@ + + + + + -- 2.20.1