From 35943a021ac4b25fad29861ecc083b6974ea2732 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 14 May 2024 21:54:28 +0200 Subject: [PATCH] counter: 1.2.12 - Renamed `WordCount` into `WordCounter` -- ALIGN --- .../counter/CounterStreamProcessor.java | 2 +- .../kafka/wordcount/counter/WordCounter.java | 8 +++--- .../counter/CounterApplicationIT.java | 6 ++--- .../CounterStreamProcessorTopologyTest.java | 6 ++--- .../kafka/wordcount/counter/TestData.java | 26 +++++++++---------- 5 files changed, 24 insertions(+), 24 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index 3dd8c0f..b1343a7 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -50,7 +50,7 @@ public class CounterStreamProcessor .groupByKey() .count(Materialized.as(storeSupplier)) .toStream() - .map((word, count) -> new KeyValue<>(word, WordCount.of(word, count))) + .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter))) .to(outputTopic); Topology topology = builder.build(); diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java b/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java index 958f9b7..1334e5b 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java @@ -8,14 +8,14 @@ import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor(staticName = "of") -public class WordCount +public class WordCounter { String user; String word; - long count; + long counter; - public static WordCount of(Word word, long count) + public static WordCounter of(Word word, long counter) { - return new WordCount(word.getUser(), word.getWord(), count); + return new WordCounter(word.getUser(), word.getWord(), counter); } } diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index e0f4672..5a3507a 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -76,16 +76,16 @@ public class CounterApplicationIT @RequiredArgsConstructor static class Consumer { - private final List> received = new LinkedList<>(); + private final List> received = new LinkedList<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) - public synchronized void receive(ConsumerRecord record) + public synchronized void receive(ConsumerRecord record) { log.debug("Received message: {}", record); received.add(KeyValue.pair(record.key(),record.value())); } - synchronized List> getReceivedMessages() + synchronized List> getReceivedMessages() { return received; } diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index 159123b..b785dfa 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -45,14 +45,14 @@ public class CounterStreamProcessorTopologyTest (JsonSerializer)keySerde.serializer(), (JsonSerializer)valueSerde.serializer()); - TestOutputTopic out = testDriver.createOutputTopic( + TestOutputTopic out = testDriver.createOutputTopic( OUT, (JsonDeserializer)keySerde.deserializer(), - (JsonDeserializer)valueSerde.deserializer()); + (JsonDeserializer)valueSerde.deserializer()); TestData.writeInputData((key, value) -> in.pipeInput(key, value)); - List> receivedMessages = out + List> receivedMessages = out .readRecordsToList() .stream() .map(record -> KeyValue.pair(record.key(), record.value())) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java index c1dd45a..43e1919 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java @@ -50,7 +50,7 @@ class TestData Word.of("klaus","s")); } - static void assertExpectedResult(List> receivedMessages) + static void assertExpectedResult(List> receivedMessages) { assertThat(receivedMessages).hasSize(11); assertThat(receivedMessages).containsSubsequence( @@ -71,41 +71,41 @@ class TestData expectedMessages[9]); // Boäh } - static KeyValue[] expectedMessages = new KeyValue[] + static KeyValue[] expectedMessages = new KeyValue[] { KeyValue.pair( Word.of("peter","Hallo"), - WordCount.of("peter","Hallo",1)), + WordCounter.of("peter","Hallo",1)), KeyValue.pair( Word.of("klaus","Müsch"), - WordCount.of("klaus","Müsch",1)), + WordCounter.of("klaus","Müsch",1)), KeyValue.pair( Word.of("peter","Welt"), - WordCount.of("peter","Welt",1)), + WordCounter.of("peter","Welt",1)), KeyValue.pair( Word.of("klaus","Müsch"), - WordCount.of("klaus","Müsch",2)), + WordCounter.of("klaus","Müsch",2)), KeyValue.pair( Word.of("klaus","s"), - WordCount.of("klaus","s",1)), + WordCounter.of("klaus","s",1)), KeyValue.pair( Word.of("peter","Boäh"), - WordCount.of("peter","Boäh",1)), + WordCounter.of("peter","Boäh",1)), KeyValue.pair( Word.of("peter","Welt"), - WordCount.of("peter","Welt",2)), + WordCounter.of("peter","Welt",2)), KeyValue.pair( Word.of("peter","Boäh"), - WordCount.of("peter","Boäh",2)), + WordCounter.of("peter","Boäh",2)), KeyValue.pair( Word.of("klaus","s"), - WordCount.of("klaus","s",2)), + WordCounter.of("klaus","s",2)), KeyValue.pair( Word.of("peter","Boäh"), - WordCount.of("peter","Boäh",3)), + WordCounter.of("peter","Boäh",3)), KeyValue.pair( Word.of("klaus","s"), - WordCount.of("klaus","s",3)), + WordCounter.of("klaus","s",3)), }; static Map convertToMap(Properties properties) -- 2.20.1