From 7b44a323729b84acf64bce5d6ca18910755f8081 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 12 Feb 2023 11:01:31 +0100 Subject: [PATCH] WIP --- .../juplo/kafka/wordcount/counter/Word.java | 4 ++ .../counter/CounterApplicationIT.java | 39 +--------------- .../CounterStreamProcessorTopologyTest.java | 6 +-- .../kafka/wordcount/counter/Message.java | 4 +- .../kafka/wordcount/counter/TestData.java | 44 +++++++++---------- 5 files changed, 33 insertions(+), 64 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java index 77287d5..4aa5ee2 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java @@ -1,9 +1,13 @@ package de.juplo.kafka.wordcount.counter; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor @Data @JsonIgnoreProperties(ignoreUnknown = true) public class Word diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index 8ed4206..c7c2acb 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -76,41 +76,6 @@ public class CounterApplicationIT } }); - Message peter1 = Message.of( - Key.of("peter", "Hallo"), - WordCount.of("peter", "Hallo", 1l)); - Message peter2 = Message.of( - Key.of("peter", "Welt"), - WordCount.of("peter", "Welt", 1l)); - Message peter3 = Message.of( - Key.of("peter", "Boäh"), - WordCount.of("peter", "Boäh", 1l)); - Message peter4 = Message.of( - Key.of("peter", "Boäh"), - WordCount.of("peter", "Boäh", 2l)); - Message peter5 = Message.of( - Key.of("peter", "Boäh"), - WordCount.of("peter", "Boäh", 3l)); - Message peter6 = Message.of( - Key.of("peter", "Welt"), - WordCount.of("peter", "Welt", 2l)); - - Message klaus1 = Message.of( - Key.of("klaus", "Müsch"), - WordCount.of("klaus", "Müsch", 1l)); - Message klaus2 = Message.of( - Key.of("klaus", "Müsch"), - WordCount.of("klaus", "Müsch", 2l)); - Message klaus3 = Message.of( - Key.of("klaus", "s"), - WordCount.of("klaus", "s", 1l)); - Message klaus4 = Message.of( - Key.of("klaus", "s"), - WordCount.of("klaus", "s", 2l)); - Message klaus5 = Message.of( - Key.of("klaus", "s"), - WordCount.of("klaus", "s", 3l)); - await("Expexted converted data") .atMost(Duration.ofSeconds(10)) .untilAsserted(() -> TestData.assertExpectedResult(consumer.getReceivedMessages())); @@ -127,9 +92,9 @@ public class CounterApplicationIT public synchronized void receive(ConsumerRecord record) throws JsonProcessingException { log.debug("Received message: {}", record); - Key key = mapper.readValue(record.key(), Key.class); + Word key = mapper.readValue(record.key(), Word.class); WordCount value = mapper.readValue(record.value(), WordCount.class); - received.add(key.getUser(), Message.of(key,value)); + received.add(Message.of(key,value)); } synchronized List getReceivedMessages() diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index 4881c40..e65819d 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -50,10 +50,10 @@ public class CounterStreamProcessorTopologyTest new StringSerializer(), new StringSerializer()); - TestOutputTopic out = testDriver.createOutputTopic( + TestOutputTopic out = testDriver.createOutputTopic( OUT, - new JsonDeserializer(Key.class).ignoreTypeHeaders(), - new StringDeserializer()); + new JsonDeserializer<>(Word.class).ignoreTypeHeaders(), + new JsonDeserializer<>(WordCount.class).ignoreTypeHeaders()); TestData.writeInputData((key, value) -> in.pipeInput(key, value)); diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/Message.java b/src/test/java/de/juplo/kafka/wordcount/counter/Message.java index 15dcbae..eb64e6d 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/Message.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/Message.java @@ -6,6 +6,6 @@ import lombok.Value; @Value(staticConstructor = "of") public class Message { - Key key; - String value; + Word key; + WordCount value; } diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java index ae18246..4814303 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java @@ -50,37 +50,37 @@ class TestData static Message[] expectedMessages = { Message.of( - Key.of("peter", "Hallo"), - "1"), + Word.of("peter", "Hallo"), + WordCount.of("peter", "Hallo", 1l)), Message.of( - Key.of("klaus", "Müsch"), - "1"), + Word.of("klaus", "Müsch"), + WordCount.of("klaus", "Müsch", 1l)), Message.of( - Key.of("peter", "Welt"), - "1"), + Word.of("peter", "Welt"), + WordCount.of("peter", "Welt", 1l)), Message.of( - Key.of("klaus", "Müsch"), - "2"), + Word.of("klaus", "Müsch"), + WordCount.of("klaus", "Müsch", 2l)), Message.of( - Key.of("klaus", "s"), - "1"), + Word.of("klaus", "s"), + WordCount.of("klaus", "s", 1l)), Message.of( - Key.of("peter", "Boäh"), - "1"), + Word.of("peter", "Boäh"), + WordCount.of("peter", "Boäh", 1l)), Message.of( - Key.of("peter", "Welt"), - "2"), + Word.of("peter", "Welt"), + WordCount.of("peter", "Welt", 2l)), Message.of( - Key.of("peter", "Boäh"), - "2"), + Word.of("peter", "Boäh"), + WordCount.of("peter", "Boäh", 2l)), Message.of( - Key.of("klaus", "s"), - "2"), + Word.of("klaus", "s"), + WordCount.of("klaus", "s", 2l)), Message.of( - Key.of("peter", "Boäh"), - "3"), + Word.of("peter", "Boäh"), + WordCount.of("peter", "Boäh", 3l)), Message.of( - Key.of("klaus", "s"), - "3"), + Word.of("klaus", "s"), + WordCount.of("klaus", "s", 3l)) }; } -- 2.20.1