]> juplo.de Git - demos/kafka/wordcount/commitdiff
counter: 1.3.0 - (RED) Introduced domain-class `User` as key
authorKai Moritz <kai@juplo.de>
Wed, 5 Jun 2024 19:30:17 +0000 (21:30 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 8 Jun 2024 11:33:30 +0000 (13:33 +0200)
* _GREEN:_ The `CounterApplicationIT` does _not_ reveal the bug!
* _RED:_ The `CounterStreamProcessorToplogyTest` fails with an exception,
  that gives a hint for the cause of the bug.
* The bug is caused by missing type-specifications for the operation
  ``cout()``.
* Before the introduction of the domain-class `User` everything worked as
  expected, because the class `Word` could be specified as default for
  the deserialization of the key.
** With the introduction of the domain-class `User` as key of the incoming
   messages, the default for the key has to switched to this class, to
   enable the application to deserialize incomming keys despite the missing
   type mapping.
** Beforehand, the default `Word` covered the missing type information
   for the ``count()``-operator.

pom.xml
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/counter/User.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index 3adeb56bbef55590030a783bb5740ce878bc1624..58597367454ea83685ca4d67fce8a736ceb663d5 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>counter</artifactId>
-       <version>1.2.15</version>
+       <version>1.3.0</version>
        <name>Wordcount-Counter</name>
        <description>Word-counting stream-processor of the multi-user wordcount-example</description>
        <properties>
index 6872d5d053e628a52f0cbd4035159fcd73768315..484b8de13e69bd44806c18e51a063cdb5ef55cc4 100644 (file)
@@ -50,10 +50,11 @@ public class CounterApplicationConfiguriation
 
                propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-               propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName());
+               propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
                propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName());
                propertyMap.put(
                                JsonDeserializer.TYPE_MAPPINGS,
+                               "user:" + User.class.getName() + "," +
                                "word:" + Word.class.getName() + "," +
                                "counter:" + WordCounter.class.getName());
 
index 712ab6573be5b7d555a1d96efc895b14bd0e457e..fd5c5a74061e474133d9cbd42c92a7e1016dafe6 100644 (file)
@@ -1,9 +1,7 @@
 package de.juplo.kafka.wordcount.counter;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
@@ -43,9 +41,7 @@ public class CounterStreamProcessor
        {
                StreamsBuilder builder = new StreamsBuilder();
 
-               KStream<String, Word> source = builder.stream(
-                               inputTopic,
-                               Consumed.with(Serdes.String(), null));
+               KStream<User, Word> source = builder.stream(inputTopic);
 
                source
                                .map((key, word) -> new KeyValue<>(word, word))
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/User.java b/src/main/java/de/juplo/kafka/wordcount/counter/User.java
new file mode 100644 (file)
index 0000000..e38bcba
--- /dev/null
@@ -0,0 +1,12 @@
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class User
+{
+  String user;
+}
index 9995ce7f938fc4abedefc78b153e451afdc0cd35..1bfceed7a9466d84f67035e79f7dfeab2459de49 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.wordcount.counter;
 
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
 import de.juplo.kafka.wordcount.splitter.TestInputWord;
 import de.juplo.kafka.wordcount.top10.TestOutputWord;
 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
@@ -34,6 +35,7 @@ import static org.awaitility.Awaitility.await;
 
 @SpringBootTest(
                properties = {
+                               "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
                                "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
                                "spring.kafka.producer.properties.spring.json.add.type.headers=false",
                                "spring.kafka.consumer.auto-offset-reset=earliest",
@@ -62,7 +64,7 @@ public class CounterApplicationIT
 
        @BeforeAll
        public static void testSendMessage(
-                       @Autowired KafkaTemplate<String, TestInputWord> kafkaTemplate)
+                       @Autowired KafkaTemplate<TestInputUser, TestInputWord> kafkaTemplate)
        {
                TestData
                                .getInputMessages()
@@ -70,7 +72,7 @@ public class CounterApplicationIT
                                {
                                        try
                                        {
-                                               SendResult<String, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+                                               SendResult<TestInputUser, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
                                                log.info(
                                                                "Sent: {}={}, partition={}, offset={}",
                                                                result.getProducerRecord().key(),
index 6e244e26c4d3c6ed8043f820a968c7a8a17b40ca..0ffd516895639461455b792f4e5d30a511db596f 100644 (file)
@@ -1,10 +1,10 @@
 package de.juplo.kafka.wordcount.counter;
 
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
 import de.juplo.kafka.wordcount.splitter.TestInputWord;
 import de.juplo.kafka.wordcount.top10.TestOutputWord;
 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
@@ -31,7 +31,7 @@ public class CounterStreamProcessorTopologyTest
 
 
   TopologyTestDriver testDriver;
-  TestInputTopic<String, TestInputWord> in;
+  TestInputTopic<TestInputUser, TestInputWord> in;
   TestOutputTopic<TestOutputWord, TestOutputWordCounter> out;
 
 
@@ -47,7 +47,7 @@ public class CounterStreamProcessorTopologyTest
 
     in = testDriver.createInputTopic(
         IN,
-        new StringSerializer(),
+        new JsonSerializer().noTypeInfo(),
         new JsonSerializer().noTypeInfo());
 
     out = testDriver.createOutputTopic(
index 7446db6b8a11b32fcde42e8e417a0fe9bb544917..54e62878eec602a682a53dce9ba60315b94070d9 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.wordcount.counter;
 
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
 import de.juplo.kafka.wordcount.splitter.TestInputWord;
 import de.juplo.kafka.wordcount.top10.TestOutputWord;
 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
@@ -30,44 +31,44 @@ class TestData
        static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
        static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
 
-       private static final KeyValue<String, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
+       private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
        {
                        new KeyValue<>(
-                                       PETER,
+                                       TestInputUser.of(PETER),
                                        TestInputWord.of(PETER, WORD_HALLO)),
                        new KeyValue<>(
-                                       KLAUS,
+                                       TestInputUser.of(KLAUS),
                                        TestInputWord.of(KLAUS, WORD_MÜSCH)),
                        new KeyValue<>(
-                                       PETER,
+                                       TestInputUser.of(PETER),
                                        TestInputWord.of(PETER, WORD_WELT)),
                        new KeyValue<>(
-                                       KLAUS,
+                                       TestInputUser.of(KLAUS),
                                        TestInputWord.of(KLAUS, WORD_MÜSCH)),
                        new KeyValue<>(
-                                       KLAUS,
+                                       TestInputUser.of(KLAUS),
                                        TestInputWord.of(KLAUS, WORD_S)),
                        new KeyValue<>(
-                                       PETER,
+                                       TestInputUser.of(PETER),
                                        TestInputWord.of(PETER, WORD_BOÄH)),
                        new KeyValue<>(
-                                       PETER,
+                                       TestInputUser.of(PETER),
                                        TestInputWord.of(PETER, WORD_WELT)),
                        new KeyValue<>(
-                                       PETER,
+                                       TestInputUser.of(PETER),
                                        TestInputWord.of(PETER, WORD_BOÄH)),
                        new KeyValue<>(
-                                       KLAUS,
+                                       TestInputUser.of(KLAUS),
                                        TestInputWord.of(KLAUS, WORD_S)),
                        new KeyValue<>(
-                                       PETER,
+                                       TestInputUser.of(PETER),
                                        TestInputWord.of(PETER, WORD_BOÄH)),
                        new KeyValue<>(
-                                       KLAUS,
+                                       TestInputUser.of(KLAUS),
                                        TestInputWord.of(KLAUS, WORD_S)),
        };
 
-       static Stream<KeyValue<String, TestInputWord>> getInputMessages()
+       static Stream<KeyValue<TestInputUser, TestInputWord>> getInputMessages()
        {
                return Stream.of(TestData.INPUT_MESSAGES);
        }
diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java
new file mode 100644 (file)
index 0000000..2255b61
--- /dev/null
@@ -0,0 +1,14 @@
+package de.juplo.kafka.wordcount.splitter;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestInputUser
+{
+  String user;
+}