counter: 1.3.1 - Refined `CounterStreamProcessor` (serde-config)
authorKai Moritz <kai@juplo.de>
Sun, 16 Jun 2024 19:08:01 +0000 (21:08 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 16 Jun 2024 19:18:32 +0000 (21:18 +0200)
* Refactored the creation of the ``JsonSerde``s, that are used to consume
  the incomming messages.
* All special ``Serdes``, that are used for incomming and outgoing messages,
  are created in separted methods now.
* Removed unnecessary operatorx in the ``Materialized``-configuration for
  the state store (the operator is not necessary, because no headers are
  present, when deserializing from a store).

src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java

index 97d460f..c983a25 100644 (file)
@@ -47,17 +47,13 @@ public class CounterStreamProcessor
                StreamsBuilder builder = new StreamsBuilder();
 
                builder
-                               .stream(
-                                               inputTopic,
-                                               Consumed.with(
-                                                               new JsonSerde<>(User.class),
-                                                               new JsonSerde<>(Word.class)))
+                               .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
                                .map((key, word) -> new KeyValue<>(word, word))
                                .groupByKey()
                                .count(
                                                Materialized
                                                                .<Word, Long>as(storeSupplier)
-                                                               .withKeySerde(new JsonSerde<>().copyWithType(Word.class).forKeys()))
+                                                               .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed!
                                .toStream()
                                .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
                                .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
@@ -87,6 +83,16 @@ public class CounterStreamProcessor
 
 
 
+       public static JsonSerde<User> inKeySerde()
+       {
+               return new JsonSerde<>(User.class);
+       }
+
+       public static JsonSerde<Word> inValueSerde()
+       {
+               return new JsonSerde<>(Word.class);
+       }
+
        public static JsonSerde<Word> outKeySerde()
        {
                return serde(true);