counter: 1.2.10 - Replaced helper-class `Message` with `KeyValue` counter-1.2.10
authorKai Moritz <kai@juplo.de>
Mon, 13 May 2024 17:14:07 +0000 (19:14 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 14 May 2024 20:34:44 +0000 (22:34 +0200)
pom.xml
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/counter/Message.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java

diff --git a/pom.xml b/pom.xml
index a710d47..2d65a21 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>counter</artifactId>
-       <version>1.2.9</version>
+       <version>1.2.10</version>
        <name>Wordcount-Counter</name>
        <description>Word-counting stream-processor of the multi-user wordcount-example</description>
        <properties>
index 158c6ae..fea89ab 100644 (file)
@@ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.BeforeEach;
@@ -75,16 +76,16 @@ public class CounterApplicationIT
        @RequiredArgsConstructor
        static class Consumer
        {
-               private final List<Message> received = new LinkedList<>();
+               private final List<KeyValue<Word, WordCount>> received = new LinkedList<>();
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
                public synchronized void receive(ConsumerRecord<Word, WordCount> record)
                {
                        log.debug("Received message: {}", record);
-                       received.add(Message.of(record.key(),record.value()));
+                       received.add(KeyValue.pair(record.key(),record.value()));
                }
 
-               synchronized List<Message> getReceivedMessages()
+               synchronized List<KeyValue<Word, WordCount>> getReceivedMessages()
                {
                        return received;
                }
index 42ca78b..1bdfd48 100644 (file)
@@ -1,9 +1,6 @@
 package de.juplo.kafka.wordcount.counter;
 
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.TestOutputTopic;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.*;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.Test;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
@@ -61,10 +58,10 @@ public class CounterStreamProcessorTopologyTest
 
     TestData.writeInputData((key, value) -> in.pipeInput(key, value));
 
-    List<Message> receivedMessages = out
+    List<KeyValue<Word,WordCount>> receivedMessages = out
         .readRecordsToList()
         .stream()
-        .map(record -> Message.of(record.key(), record.value()))
+        .map(record -> KeyValue.pair(record.key(), record.value()))
         .toList();
 
     TestData.assertExpectedResult(receivedMessages);
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/Message.java b/src/test/java/de/juplo/kafka/wordcount/counter/Message.java
deleted file mode 100644 (file)
index eb64e6d..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.Value;
-
-
-@Value(staticConstructor = "of")
-public class Message
-{
-  Word key;
-  WordCount value;
-}
index 4a65a78..5798fc3 100644 (file)
@@ -1,5 +1,7 @@
 package de.juplo.kafka.wordcount.counter;
 
+import org.apache.kafka.streams.KeyValue;
+
 import java.util.List;
 import java.util.function.BiConsumer;
 
@@ -45,7 +47,7 @@ class TestData
                                Word.of("klaus","s"));
        }
 
-       static void assertExpectedResult(List<Message> receivedMessages)
+       static void assertExpectedResult(List<KeyValue<Word, WordCount>> receivedMessages)
        {
                assertThat(receivedMessages).hasSize(11);
                assertThat(receivedMessages).containsSubsequence(
@@ -66,39 +68,39 @@ class TestData
                                expectedMessages[9]); // Boäh
        }
 
-       static Message[] expectedMessages =
+       static KeyValue<Word,WordCount>[] expectedMessages = new KeyValue[]
        {
-                       Message.of(
+                       KeyValue.pair(
                                        Word.of("peter","Hallo"),
                                        WordCount.of("peter","Hallo",1)),
-                       Message.of(
+                       KeyValue.pair(
                                        Word.of("klaus","Müsch"),
                                        WordCount.of("klaus","Müsch",1)),
-                       Message.of(
+                       KeyValue.pair(
                                        Word.of("peter","Welt"),
                                        WordCount.of("peter","Welt",1)),
-                       Message.of(
+                       KeyValue.pair(
                                        Word.of("klaus","Müsch"),
                                        WordCount.of("klaus","Müsch",2)),
-                       Message.of(
+                       KeyValue.pair(
                                        Word.of("klaus","s"),
                                        WordCount.of("klaus","s",1)),
-                       Message.of(
+                       KeyValue.pair(
                                        Word.of("peter","Boäh"),
                                        WordCount.of("peter","Boäh",1)),
-                       Message.of(
+                       KeyValue.pair(
                                        Word.of("peter","Welt"),
                                        WordCount.of("peter","Welt",2)),
-                       Message.of(
+                       KeyValue.pair(
                                        Word.of("peter","Boäh"),
                                        WordCount.of("peter","Boäh",2)),
-                       Message.of(
+                       KeyValue.pair(
                                        Word.of("klaus","s"),
                                        WordCount.of("klaus","s",2)),
-                       Message.of(
+                       KeyValue.pair(
                                        Word.of("peter","Boäh"),
                                        WordCount.of("peter","Boäh",3)),
-                       Message.of(
+                       KeyValue.pair(
                                        Word.of("klaus","s"),
                                        WordCount.of("klaus","s",3)),
        };