From: Kai Moritz Date: Wed, 5 Jun 2024 19:30:17 +0000 (+0200) Subject: counter: 1.3.0 - (RED) Introduced domain-class `User` as key X-Git-Tag: popular-on-counter~7 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=e6198710fe679bc7463d1b17c9d9dc311062ef31;p=demos%2Fkafka%2Fwordcount counter: 1.3.0 - (RED) Introduced domain-class `User` as key * _GREEN:_ The `CounterApplicationIT` does _not_ reveal the bug! * _RED:_ The `CounterStreamProcessorToplogyTest` fails with an exception, that gives a hint for the cause of the bug. * The bug is caused by missing type-specifications for the operation ``cout()``. * Before the introduction of the domain-class `User` everything worked as expected, because the class `Word` could be specified as default for the deserialization of the key. ** With the introduction of the domain-class `User` as key of the incoming messages, the default for the key has to switched to this class, to enable the application to deserialize incomming keys despite the missing type mapping. ** Beforehand, the default `Word` covered the missing type information for the ``count()``-operator. --- diff --git a/pom.xml b/pom.xml index 3adeb56..5859736 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount counter - 1.2.15 + 1.3.0 Wordcount-Counter Word-counting stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java index 6872d5d..484b8de 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java @@ -50,10 +50,11 @@ public class CounterApplicationConfiguriation propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName()); + propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName()); propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName()); propertyMap.put( JsonDeserializer.TYPE_MAPPINGS, + "user:" + User.class.getName() + "," + "word:" + Word.class.getName() + "," + "counter:" + WordCounter.class.getName()); diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index 712ab65..fd5c5a7 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -1,9 +1,7 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; @@ -43,9 +41,7 @@ public class CounterStreamProcessor { StreamsBuilder builder = new StreamsBuilder(); - KStream source = builder.stream( - inputTopic, - Consumed.with(Serdes.String(), null)); + KStream source = builder.stream(inputTopic); source .map((key, word) -> new KeyValue<>(word, word)) diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/User.java b/src/main/java/de/juplo/kafka/wordcount/counter/User.java new file mode 100644 index 0000000..e38bcba --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/counter/User.java @@ -0,0 +1,12 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class User +{ + String user; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index 9995ce7..1bfceed 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -1,5 +1,6 @@ package de.juplo.kafka.wordcount.counter; +import de.juplo.kafka.wordcount.splitter.TestInputUser; import de.juplo.kafka.wordcount.splitter.TestInputWord; import de.juplo.kafka.wordcount.top10.TestOutputWord; import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; @@ -34,6 +35,7 @@ import static org.awaitility.Awaitility.await; @SpringBootTest( properties = { + "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.properties.spring.json.add.type.headers=false", "spring.kafka.consumer.auto-offset-reset=earliest", @@ -62,7 +64,7 @@ public class CounterApplicationIT @BeforeAll public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) + @Autowired KafkaTemplate kafkaTemplate) { TestData .getInputMessages() @@ -70,7 +72,7 @@ public class CounterApplicationIT { try { - SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); log.info( "Sent: {}={}, partition={}, offset={}", result.getProducerRecord().key(), diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index 6e244e2..0ffd516 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -1,10 +1,10 @@ package de.juplo.kafka.wordcount.counter; +import de.juplo.kafka.wordcount.splitter.TestInputUser; import de.juplo.kafka.wordcount.splitter.TestInputWord; import de.juplo.kafka.wordcount.top10.TestOutputWord; import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.Topology; @@ -31,7 +31,7 @@ public class CounterStreamProcessorTopologyTest TopologyTestDriver testDriver; - TestInputTopic in; + TestInputTopic in; TestOutputTopic out; @@ -47,7 +47,7 @@ public class CounterStreamProcessorTopologyTest in = testDriver.createInputTopic( IN, - new StringSerializer(), + new JsonSerializer().noTypeInfo(), new JsonSerializer().noTypeInfo()); out = testDriver.createOutputTopic( diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java index 7446db6..54e6287 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java @@ -1,5 +1,6 @@ package de.juplo.kafka.wordcount.counter; +import de.juplo.kafka.wordcount.splitter.TestInputUser; import de.juplo.kafka.wordcount.splitter.TestInputWord; import de.juplo.kafka.wordcount.top10.TestOutputWord; import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; @@ -30,44 +31,44 @@ class TestData static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH); static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S); - private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { new KeyValue<>( - PETER, + TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_HALLO)), new KeyValue<>( - KLAUS, + TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_MÜSCH)), new KeyValue<>( - PETER, + TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_WELT)), new KeyValue<>( - KLAUS, + TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_MÜSCH)), new KeyValue<>( - KLAUS, + TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_S)), new KeyValue<>( - PETER, + TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_BOÄH)), new KeyValue<>( - PETER, + TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_WELT)), new KeyValue<>( - PETER, + TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_BOÄH)), new KeyValue<>( - KLAUS, + TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_S)), new KeyValue<>( - PETER, + TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_BOÄH)), new KeyValue<>( - KLAUS, + TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_S)), }; - static Stream> getInputMessages() + static Stream> getInputMessages() { return Stream.of(TestData.INPUT_MESSAGES); } diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java new file mode 100644 index 0000000..2255b61 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.splitter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestInputUser +{ + String user; +}