WIP
authorKai Moritz <kai@juplo.de>
Sat, 11 Feb 2023 06:58:18 +0000 (07:58 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 12 Feb 2023 14:15:45 +0000 (15:15 +0100)
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java
src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/counter/Key.java
src/main/java/de/juplo/kafka/wordcount/counter/Word.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java

index 20cb4d2..7c0a783 100644 (file)
@@ -12,6 +12,9 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
@@ -35,8 +38,15 @@ public class CounterApplication
 
                propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
                propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
-               propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
-               propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
+               propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+               propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+               propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, Key.class.getPackageName());
+               propertyMap.put(
+                               JsonDeserializer.TYPE_MAPPINGS,
+                               "W=" + Word.class.getName() + "," +
+                               "K=" + Key.class.getName() + "," +
+                               "C=" + WordCount.class.getName());
+               propertyMap.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, true);
                propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target");
                if (properties.getCommitInterval() != null)
                        propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval());
index 4002df2..07baedb 100644 (file)
@@ -1,8 +1,8 @@
 package de.juplo.kafka.wordcount.counter;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -10,6 +10,9 @@ import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.kstream.*;
+import org.apache.kafka.streams.state.Stores;
+import org.springframework.kafka.support.serializer.JsonSerde;
 
 import java.util.Properties;
 
@@ -44,27 +47,44 @@ public class CounterStreamProcessor
        {
                StreamsBuilder builder = new StreamsBuilder();
 
-               KStream<String, String> source = builder.stream(inputTopic);
+               KStream<String, Word> source = builder.stream(
+                               inputTopic,
+                               Consumed.with(
+                                               Serdes.String(),
+                                               new JsonSerde<>(Word.class)
+                                                               .ignoreTypeHeaders()));
+
                source
-                               .map((username, word) ->
-                               {
-                                       try
-                                       {
-                                               String key = mapper.writeValueAsString(Key.of(username, word));
-                                               return new KeyValue<>(key, word);
-                                       }
-                                       catch (JsonProcessingException e)
-                                       {
-                                               throw new RuntimeException(e);
-                                       }
-                               })
-                               .groupByKey()
-                               .count(Materialized.as(storeSupplier))
-                               .mapValues(value->Long.toString(value))
+                               .map((key, word) -> new KeyValue<>(word, word))
+                               .groupByKey(Grouped.with(
+                                               new JsonSerde<>(Word.class)
+                                                               .forKeys()
+                                                               .noTypeInfo(),
+                                               new JsonSerde<>(Word.class)
+                                                               .noTypeInfo()))
+                               .count(Materialized
+                                               .<Word,Long>as(storeSupplier)
+                                               .withKeySerde(
+                                                               new JsonSerde<>(Word.class)
+                                                                               .forKeys()
+                                                                               .noTypeInfo())
+                                               .withValueSerde(
+                                                               Serdes.Long()))
                                .toStream()
-                               .to(outputTopic);
+                               .map((word, count) -> new KeyValue<>(word, WordCount.of(word.getUser(), word.getWord(), count)))
+                               .to(
+                                               outputTopic,
+                                               Produced.with(
+                                                               new JsonSerde<>(Word.class)
+                                                                               .forKeys()
+                                                                               .noTypeInfo(),
+                                                               new JsonSerde<>(WordCount.class)
+                                                                               .noTypeInfo()));
+
+               Topology topology = builder.build();
+               log.info("\n\n{}", topology.describe());
 
-               return builder.build();
+               return topology;
        }
 
        public void start()
index f926efe..5a805c6 100644 (file)
@@ -11,6 +11,6 @@ import lombok.Value;
 @NoArgsConstructor
 public class Key
 {
-  private String username;
+  private String user;
   private String word;
 }
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java
new file mode 100644 (file)
index 0000000..77287d5
--- /dev/null
@@ -0,0 +1,13 @@
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Word
+{
+  private String user;
+  private String word;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java b/src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java
new file mode 100644 (file)
index 0000000..44ccb2d
--- /dev/null
@@ -0,0 +1,12 @@
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.Value;
+
+
+@Value(staticConstructor = "of")
+public class WordCount
+{
+  String user;
+  String word;
+  long count;
+}
index a345935..8ed4206 100644 (file)
@@ -61,7 +61,55 @@ public class CounterApplicationIT
        @Test
        void testSendMessage() throws Exception
        {
-               TestData.writeInputData((key, value) -> kafkaTemplate.send(TOPIC_IN, key, value));
+               TestData.writeInputData((key, value) ->
+               {
+                       try
+                       {
+                               Word word = new Word();
+                               word.setUser("peter");
+                               word.setWord("Hallo");
+                               kafkaTemplate.send(TOPIC_IN, word.getUser(), mapper.writeValueAsString(word));
+                       }
+                       catch (JsonProcessingException e)
+                       {
+                               throw new RuntimeException(e);
+                       }
+               });
+
+               Message peter1 = Message.of(
+                               Key.of("peter", "Hallo"),
+                               WordCount.of("peter", "Hallo", 1l));
+               Message peter2 = Message.of(
+                               Key.of("peter", "Welt"),
+                               WordCount.of("peter", "Welt", 1l));
+               Message peter3 = Message.of(
+                               Key.of("peter", "Boäh"),
+                               WordCount.of("peter", "Boäh", 1l));
+               Message peter4 = Message.of(
+                               Key.of("peter", "Boäh"),
+                               WordCount.of("peter", "Boäh", 2l));
+               Message peter5 = Message.of(
+                               Key.of("peter", "Boäh"),
+                               WordCount.of("peter", "Boäh", 3l));
+               Message peter6 = Message.of(
+                               Key.of("peter", "Welt"),
+                               WordCount.of("peter", "Welt", 2l));
+
+               Message klaus1 = Message.of(
+                               Key.of("klaus", "Müsch"),
+                               WordCount.of("klaus", "Müsch", 1l));
+               Message klaus2 = Message.of(
+                               Key.of("klaus", "Müsch"),
+                               WordCount.of("klaus", "Müsch", 2l));
+               Message klaus3 = Message.of(
+                               Key.of("klaus", "s"),
+                               WordCount.of("klaus", "s", 1l));
+               Message klaus4 = Message.of(
+                               Key.of("klaus", "s"),
+                               WordCount.of("klaus", "s", 2l));
+               Message klaus5 = Message.of(
+                               Key.of("klaus", "s"),
+                               WordCount.of("klaus", "s", 3l));
 
                await("Expexted converted data")
                                .atMost(Duration.ofSeconds(10))
@@ -80,7 +128,8 @@ public class CounterApplicationIT
                {
                        log.debug("Received message: {}", record);
                        Key key = mapper.readValue(record.key(), Key.class);
-                       received.add(Message.of(key,record.value()));
+                       WordCount value = mapper.readValue(record.value(), WordCount.class);
+                       received.add(key.getUser(), Message.of(key,value));
                }
 
                synchronized List<Message> getReceivedMessages()