From 5bde02dfe2f22e88bf9fd6c7375e1df612056d30 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 11 Feb 2023 07:58:18 +0100 Subject: [PATCH] WIP --- .../wordcount/counter/CounterApplication.java | 14 ++++- .../counter/CounterStreamProcessor.java | 58 +++++++++++++------ .../de/juplo/kafka/wordcount/counter/Key.java | 2 +- .../juplo/kafka/wordcount/counter/Word.java | 13 +++++ .../kafka/wordcount/counter/WordCount.java | 12 ++++ .../counter/CounterApplicationIT.java | 53 ++++++++++++++++- 6 files changed, 128 insertions(+), 24 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/Word.java create mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java index 20cb4d2..7c0a783 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java @@ -12,6 +12,9 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; +import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -35,8 +38,15 @@ public class CounterApplication propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); - propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, Key.class.getPackageName()); + propertyMap.put( + JsonDeserializer.TYPE_MAPPINGS, + "W=" + Word.class.getName() + "," + + "K=" + Key.class.getName() + "," + + "C=" + WordCount.class.getName()); + propertyMap.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, true); propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target"); if (properties.getCommitInterval() != null) propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index 4002df2..07baedb 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -1,8 +1,8 @@ package de.juplo.kafka.wordcount.counter; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -10,6 +10,9 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.kstream.*; +import org.apache.kafka.streams.state.Stores; +import org.springframework.kafka.support.serializer.JsonSerde; import java.util.Properties; @@ -44,27 +47,44 @@ public class CounterStreamProcessor { StreamsBuilder builder = new StreamsBuilder(); - KStream source = builder.stream(inputTopic); + KStream source = builder.stream( + inputTopic, + Consumed.with( + Serdes.String(), + new JsonSerde<>(Word.class) + .ignoreTypeHeaders())); + source - .map((username, word) -> - { - try - { - String key = mapper.writeValueAsString(Key.of(username, word)); - return new KeyValue<>(key, word); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }) - .groupByKey() - .count(Materialized.as(storeSupplier)) - .mapValues(value->Long.toString(value)) + .map((key, word) -> new KeyValue<>(word, word)) + .groupByKey(Grouped.with( + new JsonSerde<>(Word.class) + .forKeys() + .noTypeInfo(), + new JsonSerde<>(Word.class) + .noTypeInfo())) + .count(Materialized + .as(storeSupplier) + .withKeySerde( + new JsonSerde<>(Word.class) + .forKeys() + .noTypeInfo()) + .withValueSerde( + Serdes.Long())) .toStream() - .to(outputTopic); + .map((word, count) -> new KeyValue<>(word, WordCount.of(word.getUser(), word.getWord(), count))) + .to( + outputTopic, + Produced.with( + new JsonSerde<>(Word.class) + .forKeys() + .noTypeInfo(), + new JsonSerde<>(WordCount.class) + .noTypeInfo())); + + Topology topology = builder.build(); + log.info("\n\n{}", topology.describe()); - return builder.build(); + return topology; } public void start() diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java b/src/main/java/de/juplo/kafka/wordcount/counter/Key.java index f926efe..5a805c6 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/Key.java @@ -11,6 +11,6 @@ import lombok.Value; @NoArgsConstructor public class Key { - private String username; + private String user; private String word; } diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java new file mode 100644 index 0000000..77287d5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class Word +{ + private String user; + private String word; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java b/src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java new file mode 100644 index 0000000..44ccb2d --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java @@ -0,0 +1,12 @@ +package de.juplo.kafka.wordcount.counter; + +import lombok.Value; + + +@Value(staticConstructor = "of") +public class WordCount +{ + String user; + String word; + long count; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index a345935..8ed4206 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -61,7 +61,55 @@ public class CounterApplicationIT @Test void testSendMessage() throws Exception { - TestData.writeInputData((key, value) -> kafkaTemplate.send(TOPIC_IN, key, value)); + TestData.writeInputData((key, value) -> + { + try + { + Word word = new Word(); + word.setUser("peter"); + word.setWord("Hallo"); + kafkaTemplate.send(TOPIC_IN, word.getUser(), mapper.writeValueAsString(word)); + } + catch (JsonProcessingException e) + { + throw new RuntimeException(e); + } + }); + + Message peter1 = Message.of( + Key.of("peter", "Hallo"), + WordCount.of("peter", "Hallo", 1l)); + Message peter2 = Message.of( + Key.of("peter", "Welt"), + WordCount.of("peter", "Welt", 1l)); + Message peter3 = Message.of( + Key.of("peter", "Boäh"), + WordCount.of("peter", "Boäh", 1l)); + Message peter4 = Message.of( + Key.of("peter", "Boäh"), + WordCount.of("peter", "Boäh", 2l)); + Message peter5 = Message.of( + Key.of("peter", "Boäh"), + WordCount.of("peter", "Boäh", 3l)); + Message peter6 = Message.of( + Key.of("peter", "Welt"), + WordCount.of("peter", "Welt", 2l)); + + Message klaus1 = Message.of( + Key.of("klaus", "Müsch"), + WordCount.of("klaus", "Müsch", 1l)); + Message klaus2 = Message.of( + Key.of("klaus", "Müsch"), + WordCount.of("klaus", "Müsch", 2l)); + Message klaus3 = Message.of( + Key.of("klaus", "s"), + WordCount.of("klaus", "s", 1l)); + Message klaus4 = Message.of( + Key.of("klaus", "s"), + WordCount.of("klaus", "s", 2l)); + Message klaus5 = Message.of( + Key.of("klaus", "s"), + WordCount.of("klaus", "s", 3l)); await("Expexted converted data") .atMost(Duration.ofSeconds(10)) @@ -80,7 +128,8 @@ public class CounterApplicationIT { log.debug("Received message: {}", record); Key key = mapper.readValue(record.key(), Key.class); - received.add(Message.of(key,record.value())); + WordCount value = mapper.readValue(record.value(), WordCount.class); + received.add(key.getUser(), Message.of(key,value)); } synchronized List getReceivedMessages() -- 2.20.1