From 4e10b6e106409b2884fcef67f7a5ded0581bc35f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 13 May 2024 19:14:07 +0200 Subject: [PATCH] counter: 1.2.10 - Replaced helper-class `Message` with `KeyValue` --- pom.xml | 2 +- .../counter/CounterApplicationIT.java | 7 +++-- .../CounterStreamProcessorTopologyTest.java | 9 ++---- .../kafka/wordcount/counter/Message.java | 11 -------- .../kafka/wordcount/counter/TestData.java | 28 ++++++++++--------- 5 files changed, 23 insertions(+), 34 deletions(-) delete mode 100644 src/test/java/de/juplo/kafka/wordcount/counter/Message.java diff --git a/pom.xml b/pom.xml index a710d47..2d65a21 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount counter - 1.2.9 + 1.2.10 Wordcount-Counter Word-counting stream-processor of the multi-user wordcount-example diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index 158c6ae..fea89ab 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.BeforeEach; @@ -75,16 +76,16 @@ public class CounterApplicationIT @RequiredArgsConstructor static class Consumer { - private final List received = new LinkedList<>(); + private final List> received = new LinkedList<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive(ConsumerRecord record) { log.debug("Received message: {}", record); - received.add(Message.of(record.key(),record.value())); + received.add(KeyValue.pair(record.key(),record.value())); } - synchronized List getReceivedMessages() + synchronized List> getReceivedMessages() { return received; } diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index 42ca78b..1bdfd48 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -1,9 +1,6 @@ package de.juplo.kafka.wordcount.counter; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.TestOutputTopic; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.*; import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.Test; import org.springframework.kafka.support.serializer.JsonDeserializer; @@ -61,10 +58,10 @@ public class CounterStreamProcessorTopologyTest TestData.writeInputData((key, value) -> in.pipeInput(key, value)); - List receivedMessages = out + List> receivedMessages = out .readRecordsToList() .stream() - .map(record -> Message.of(record.key(), record.value())) + .map(record -> KeyValue.pair(record.key(), record.value())) .toList(); TestData.assertExpectedResult(receivedMessages); diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/Message.java b/src/test/java/de/juplo/kafka/wordcount/counter/Message.java deleted file mode 100644 index eb64e6d..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/counter/Message.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.Value; - - -@Value(staticConstructor = "of") -public class Message -{ - Word key; - WordCount value; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java index 4a65a78..5798fc3 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java @@ -1,5 +1,7 @@ package de.juplo.kafka.wordcount.counter; +import org.apache.kafka.streams.KeyValue; + import java.util.List; import java.util.function.BiConsumer; @@ -45,7 +47,7 @@ class TestData Word.of("klaus","s")); } - static void assertExpectedResult(List receivedMessages) + static void assertExpectedResult(List> receivedMessages) { assertThat(receivedMessages).hasSize(11); assertThat(receivedMessages).containsSubsequence( @@ -66,39 +68,39 @@ class TestData expectedMessages[9]); // Boäh } - static Message[] expectedMessages = + static KeyValue[] expectedMessages = new KeyValue[] { - Message.of( + KeyValue.pair( Word.of("peter","Hallo"), WordCount.of("peter","Hallo",1)), - Message.of( + KeyValue.pair( Word.of("klaus","Müsch"), WordCount.of("klaus","Müsch",1)), - Message.of( + KeyValue.pair( Word.of("peter","Welt"), WordCount.of("peter","Welt",1)), - Message.of( + KeyValue.pair( Word.of("klaus","Müsch"), WordCount.of("klaus","Müsch",2)), - Message.of( + KeyValue.pair( Word.of("klaus","s"), WordCount.of("klaus","s",1)), - Message.of( + KeyValue.pair( Word.of("peter","Boäh"), WordCount.of("peter","Boäh",1)), - Message.of( + KeyValue.pair( Word.of("peter","Welt"), WordCount.of("peter","Welt",2)), - Message.of( + KeyValue.pair( Word.of("peter","Boäh"), WordCount.of("peter","Boäh",2)), - Message.of( + KeyValue.pair( Word.of("klaus","s"), WordCount.of("klaus","s",2)), - Message.of( + KeyValue.pair( Word.of("peter","Boäh"), WordCount.of("peter","Boäh",3)), - Message.of( + KeyValue.pair( Word.of("klaus","s"), WordCount.of("klaus","s",3)), }; -- 2.20.1