popular: 1.0.0 - Renamed packages and classes -- ALIGN
authorKai Moritz <kai@juplo.de>
Sat, 15 Jun 2024 21:22:38 +0000 (23:22 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 17 Jun 2024 19:19:47 +0000 (21:19 +0200)
16 files changed:
src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java
src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java
src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java
src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/popular/User.java
src/main/java/de/juplo/kafka/wordcount/popular/Word.java
src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/popular/TestData.java
src/test/java/de/juplo/kafka/wordcount/splitter/InputUser.java
src/test/java/de/juplo/kafka/wordcount/splitter/InputWord.java
src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java
src/test/java/de/juplo/kafka/wordcount/stats/OutputWordCounter.java
src/test/resources/logback-test.xml

index e6d3b1f..602f3f6 100644 (file)
@@ -1,14 +1,14 @@
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 
 
 @SpringBootApplication
-public class CounterApplication
+public class PopularApplication
 {
        public static void main(String[] args)
        {
-               SpringApplication.run(CounterApplication.class, args);
+               SpringApplication.run(PopularApplication.class, args);
        }
 }
index 174521f..1c02197 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -16,18 +16,18 @@ import org.springframework.kafka.support.serializer.JsonSerde;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 
 
 @Configuration
-@EnableConfigurationProperties(CounterApplicationProperties.class)
+@EnableConfigurationProperties(PopularApplicationProperties.class)
 @Slf4j
-public class CounterApplicationConfiguriation
+public class PopularApplicationConfiguriation
 {
        @Bean
        public Properties streamProcessorProperties(
-                       CounterApplicationProperties counterProperties)
+                       PopularApplicationProperties counterProperties)
        {
                Properties propertyMap = serializationConfig();
 
@@ -50,19 +50,19 @@ public class CounterApplicationConfiguriation
 
                propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-               propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName());
+               propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, PopularApplication.class.getPackageName());
 
                return propertyMap;
        }
 
        @Bean(initMethod = "start", destroyMethod = "stop")
-       public CounterStreamProcessor streamProcessor(
-                       CounterApplicationProperties applicationProperties,
+       public PopularStreamProcessor streamProcessor(
+                       PopularApplicationProperties applicationProperties,
                        Properties streamProcessorProperties,
                        KeyValueBytesStoreSupplier storeSupplier,
                        ConfigurableApplicationContext context)
        {
-               CounterStreamProcessor streamProcessor = new CounterStreamProcessor(
+               PopularStreamProcessor streamProcessor = new PopularStreamProcessor(
                                applicationProperties.getInputTopic(),
                                applicationProperties.getOutputTopic(),
                                streamProcessorProperties,
index c3ada17..12f55f5 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
 
 
 import lombok.Getter;
@@ -7,16 +7,16 @@ import lombok.ToString;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
 
-@ConfigurationProperties("juplo.wordcount.counter")
+@ConfigurationProperties("juplo.wordcount.popular")
 @Getter
 @Setter
 @ToString
-public class CounterApplicationProperties
+public class PopularApplicationProperties
 {
   private String bootstrapServer = "localhost:9092";
-  private String applicationId = "counter";
+  private String applicationId = "popular";
   private String inputTopic = "words";
-  private String outputTopic = "countings";
+  private String outputTopic = "popular";
   private Integer commitInterval;
   private Integer cacheMaxBytes;
 }
index 2304e55..7bba240 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.*;
@@ -17,21 +17,21 @@ import java.util.stream.Collectors;
 
 
 @Slf4j
-public class CounterStreamProcessor
+public class PopularStreamProcessor
 {
-       public static final String STORE_NAME = "counter";
+       public static final String STORE_NAME = "popular";
 
 
        public final KafkaStreams streams;
 
 
-       public CounterStreamProcessor(
+       public PopularStreamProcessor(
                        String inputTopic,
                        String outputTopic,
                        Properties properties,
                        KeyValueBytesStoreSupplier storeSupplier)
        {
-               Topology topology = CounterStreamProcessor.buildTopology(
+               Topology topology = PopularStreamProcessor.buildTopology(
                                inputTopic,
                                outputTopic,
                                storeSupplier);
index e38bcba..6dbe261 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import lombok.Data;
index 77287d5..92c1d6d 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import lombok.Data;
index f1fce71..39eebf0 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
 
 import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
index d940f22..059b521 100644 (file)
@@ -1,5 +1,5 @@
 server:
-  port: 8083
+  port: 8087
 management:
   endpoints:
     web:
index 0faa2de..a327389 100644 (file)
@@ -1,9 +1,9 @@
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
 
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import de.juplo.kafka.wordcount.splitter.InputUser;
+import de.juplo.kafka.wordcount.splitter.InputWord;
+import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
+import de.juplo.kafka.wordcount.stats.OutputWordCounter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
@@ -26,9 +26,9 @@ import org.springframework.util.MultiValueMap;
 
 import java.time.Duration;
 
-import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
-import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_IN;
+import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_OUT;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
 import static org.awaitility.Awaitility.await;
 
 
@@ -41,17 +41,17 @@ import static org.awaitility.Awaitility.await;
                                "spring.kafka.consumer.auto-offset-reset=earliest",
                                "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
-                               "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
+                               "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.stats.OutputWindowedWord,counter:de.juplo.kafka.wordcount.stats.OutputWordCounter",
                                "logging.level.root=WARN",
                                "logging.level.de.juplo=DEBUG",
-                               "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
-                               "juplo.wordcount.counter.commit-interval=100",
-                               "juplo.wordcount.counter.cache-max-bytes=0",
-                               "juplo.wordcount.counter.input-topic=" + TOPIC_IN,
-                               "juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
+                               "juplo.wordcount.popular.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "juplo.wordcount.popular.commit-interval=100",
+                               "juplo.wordcount.popular.cache-max-bytes=0",
+                               "juplo.wordcount.popular.input-topic=" + TOPIC_IN,
+                               "juplo.wordcount.popular.output-topic=" + TOPIC_OUT })
 @EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
 @Slf4j
-public class CounterApplicationIT
+public class PopularApplicationIT
 {
        public static final String TOPIC_IN = "in";
        public static final String TOPIC_OUT = "out";
@@ -59,12 +59,12 @@ public class CounterApplicationIT
        @Autowired
        Consumer consumer;
        @Autowired
-       CounterStreamProcessor streamProcessor;
+       PopularStreamProcessor streamProcessor;
 
 
        @BeforeAll
        public static void testSendMessage(
-                       @Autowired KafkaTemplate<TestInputUser, TestInputWord> kafkaTemplate)
+                       @Autowired KafkaTemplate<InputUser, InputWord> kafkaTemplate)
        {
                TestData
                                .getInputMessages()
@@ -72,7 +72,7 @@ public class CounterApplicationIT
                                {
                                        try
                                        {
-                                               SendResult<TestInputUser, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+                                               SendResult<InputUser, InputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
                                                log.info(
                                                                "Sent: {}={}, partition={}, offset={}",
                                                                result.getProducerRecord().key(),
@@ -129,19 +129,19 @@ public class CounterApplicationIT
 
        static class Consumer
        {
-               private final MultiValueMap<TestOutputWord, TestOutputWordCounter> received = new LinkedMultiValueMap<>();
+               private final MultiValueMap<OutputWindowedWord, OutputWordCounter> received = new LinkedMultiValueMap<>();
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
                public synchronized void receive(
-                               @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word,
-                               @Payload TestOutputWordCounter counter)
+                               @Header(KafkaHeaders.RECEIVED_KEY) OutputWindowedWord word,
+                               @Payload OutputWordCounter counter)
                {
                        log.debug("Received message: {} -> {}", word, counter);
                        received.add(word, counter);
                }
 
                synchronized void enforceAssertion(
-                               java.util.function.Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> assertion)
+                               java.util.function.Consumer<MultiValueMap<OutputWindowedWord, OutputWordCounter>> assertion)
                {
                        assertion.accept(received);
                }
index e80e383..3e71d3e 100644 (file)
@@ -1,9 +1,9 @@
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
 
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import de.juplo.kafka.wordcount.splitter.InputUser;
+import de.juplo.kafka.wordcount.splitter.InputWord;
+import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
+import de.juplo.kafka.wordcount.stats.OutputWordCounter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
@@ -22,34 +22,34 @@ import org.springframework.util.MultiValueMap;
 
 import java.util.Map;
 
-import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
 
 
 @Slf4j
-public class CounterStreamProcessorTopologyTest
+public class PopularStreamProcessorTopologyTest
 {
   public static final String IN = "TEST-IN";
   public static final String OUT = "TEST-OUT";
 
 
   static TopologyTestDriver testDriver;
-  static MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
+  static MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
 
 
   @BeforeAll
   public static void setUpTestDriver()
   {
-    Topology topology = CounterStreamProcessor.buildTopology(
+    Topology topology = PopularStreamProcessor.buildTopology(
         IN,
         OUT,
         Stores.inMemoryKeyValueStore(STORE_NAME));
 
     testDriver = new TopologyTestDriver(topology, serializationConfig());
 
-    TestInputTopic<TestInputUser, TestInputWord> in =
+    TestInputTopic<InputUser, InputWord> in =
         testDriver.createInputTopic(IN, serializer(), serializer());
-    TestOutputTopic<TestOutputWord, TestOutputWordCounter> out =
+    TestOutputTopic<OutputWindowedWord, OutputWordCounter> out =
         testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
 
     TestData
@@ -104,12 +104,12 @@ public class CounterStreamProcessorTopologyTest
     return new JsonSerializer().noTypeInfo();
   }
 
-  private static JsonDeserializer<TestOutputWord> keyDeserializer()
+  private static JsonDeserializer<OutputWindowedWord> keyDeserializer()
   {
     return deserializer(true);
   }
 
-  private static JsonDeserializer<TestOutputWordCounter> valueDeserializer()
+  private static JsonDeserializer<OutputWordCounter> valueDeserializer()
   {
     return deserializer(false);
   }
@@ -125,6 +125,6 @@ public class CounterStreamProcessorTopologyTest
 
   private static String typeMappingsConfig()
   {
-    return CounterStreamProcessor.typeMappingsConfig(TestOutputWord.class, TestOutputWordCounter.class);
+    return PopularStreamProcessor.typeMappingsConfig(OutputWindowedWord.class, OutputWordCounter.class);
   }
 }
index 862eb2b..892ea81 100644 (file)
@@ -1,9 +1,9 @@
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
 
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import de.juplo.kafka.wordcount.splitter.InputUser;
+import de.juplo.kafka.wordcount.splitter.InputWord;
+import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
+import de.juplo.kafka.wordcount.stats.OutputWordCounter;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.springframework.util.LinkedMultiValueMap;
@@ -25,55 +25,55 @@ class TestData
        static final String WORD_S = "s";
        static final String WORD_BOÄH = "Boäh";
 
-       static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO);
-       static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT);
-       static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH);
-       static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
-       static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
+       static final OutputWindowedWord PETER_HALLO = OutputWindowedWord.of(PETER, WORD_HALLO);
+       static final OutputWindowedWord PETER_WELT = OutputWindowedWord.of(PETER, WORD_WELT);
+       static final OutputWindowedWord PETER_BOÄH = OutputWindowedWord.of(PETER, WORD_BOÄH);
+       static final OutputWindowedWord KLAUS_MÜSCH = OutputWindowedWord.of(KLAUS, WORD_MÜSCH);
+       static final OutputWindowedWord KLAUS_S = OutputWindowedWord.of(KLAUS, WORD_S);
 
-       private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
+       private static final KeyValue<InputUser, InputWord>[] INPUT_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair(
-                                       TestInputUser.of(PETER),
-                                       TestInputWord.of(PETER, WORD_HALLO)),
+                                       InputUser.of(PETER),
+                                       InputWord.of(PETER, WORD_HALLO)),
                        KeyValue.pair(
-                                       TestInputUser.of(KLAUS),
-                                       TestInputWord.of(KLAUS, WORD_MÜSCH)),
+                                       InputUser.of(KLAUS),
+                                       InputWord.of(KLAUS, WORD_MÜSCH)),
                        KeyValue.pair(
-                                       TestInputUser.of(PETER),
-                                       TestInputWord.of(PETER, WORD_WELT)),
+                                       InputUser.of(PETER),
+                                       InputWord.of(PETER, WORD_WELT)),
                        KeyValue.pair(
-                                       TestInputUser.of(KLAUS),
-                                       TestInputWord.of(KLAUS, WORD_MÜSCH)),
+                                       InputUser.of(KLAUS),
+                                       InputWord.of(KLAUS, WORD_MÜSCH)),
                        KeyValue.pair(
-                                       TestInputUser.of(KLAUS),
-                                       TestInputWord.of(KLAUS, WORD_S)),
+                                       InputUser.of(KLAUS),
+                                       InputWord.of(KLAUS, WORD_S)),
                        KeyValue.pair(
-                                       TestInputUser.of(PETER),
-                                       TestInputWord.of(PETER, WORD_BOÄH)),
+                                       InputUser.of(PETER),
+                                       InputWord.of(PETER, WORD_BOÄH)),
                        KeyValue.pair(
-                                       TestInputUser.of(PETER),
-                                       TestInputWord.of(PETER, WORD_WELT)),
+                                       InputUser.of(PETER),
+                                       InputWord.of(PETER, WORD_WELT)),
                        KeyValue.pair(
-                                       TestInputUser.of(PETER),
-                                       TestInputWord.of(PETER, WORD_BOÄH)),
+                                       InputUser.of(PETER),
+                                       InputWord.of(PETER, WORD_BOÄH)),
                        KeyValue.pair(
-                                       TestInputUser.of(KLAUS),
-                                       TestInputWord.of(KLAUS, WORD_S)),
+                                       InputUser.of(KLAUS),
+                                       InputWord.of(KLAUS, WORD_S)),
                        KeyValue.pair(
-                                       TestInputUser.of(PETER),
-                                       TestInputWord.of(PETER, WORD_BOÄH)),
+                                       InputUser.of(PETER),
+                                       InputWord.of(PETER, WORD_BOÄH)),
                        KeyValue.pair(
-                                       TestInputUser.of(KLAUS),
-                                       TestInputWord.of(KLAUS, WORD_S)),
+                                       InputUser.of(KLAUS),
+                                       InputWord.of(KLAUS, WORD_S)),
        };
 
-       static Stream<KeyValue<TestInputUser, TestInputWord>> getInputMessages()
+       static Stream<KeyValue<InputUser, InputWord>> getInputMessages()
        {
                return Stream.of(TestData.INPUT_MESSAGES);
        }
 
-       static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+       static void assertExpectedMessages(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
        {
                expectedMessages().forEach(
                                (word, counter) ->
@@ -81,7 +81,7 @@ class TestData
                                                                .containsExactlyElementsOf(counter));
        }
 
-       static void assertExpectedNumberOfMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+       static void assertExpectedNumberOfMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
        {
                assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
                assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
@@ -90,7 +90,7 @@ class TestData
                assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
        }
 
-       private static int countMessagesForWord(TestOutputWord word, MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForUsers)
+       private static int countMessagesForWord(OutputWindowedWord word, MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForUsers)
        {
                return messagesForUsers.get(word) == null
                                ? 0
@@ -106,17 +106,17 @@ class TestData
                assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
        }
 
-       private static Word wordOf(TestOutputWord testOutputWord)
+       private static Word wordOf(OutputWindowedWord testOutputWindowedWord)
        {
                Word word = new Word();
 
-               word.setUser(testOutputWord.getUser());
-               word.setWord(testOutputWord.getWord());
+               word.setUser(testOutputWindowedWord.getUser());
+               word.setWord(testOutputWindowedWord.getWord());
 
                return word;
        }
 
-       static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+       static void assertExpectedLastMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
        {
                assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
                assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
@@ -126,10 +126,10 @@ class TestData
        }
 
        private static void assertWordCountEqualsWordCountFromLastMessage(
-                       TestOutputWord word,
+                       OutputWindowedWord word,
                        Long counter)
        {
-               TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
+               OutputWordCounter testOutputWordCounter = OutputWordCounter.of(
                                word.getUser(),
                                word.getWord(),
                                counter);
@@ -137,20 +137,20 @@ class TestData
        }
 
        private static void assertWordCountEqualsWordCountFromLastMessage(
-                       TestOutputWord word,
-                       TestOutputWordCounter counter)
+                       OutputWindowedWord word,
+                       OutputWordCounter counter)
        {
                assertThat(counter).isEqualTo(getLastMessageFor(word));
        }
 
-       private static TestOutputWordCounter getLastMessageFor(TestOutputWord word)
+       private static OutputWordCounter getLastMessageFor(OutputWindowedWord word)
        {
                return getLastMessageFor(word, expectedMessages());
        }
 
-       private static TestOutputWordCounter getLastMessageFor(
-                       TestOutputWord user,
-                       MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForWord)
+       private static OutputWordCounter getLastMessageFor(
+                       OutputWindowedWord user,
+                       MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForWord)
        {
                return messagesForWord
                                .get(user)
@@ -158,46 +158,46 @@ class TestData
                                .reduce(null, (left, right) -> right);
        }
 
-       private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
+       private static final KeyValue<OutputWindowedWord, OutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair(
                                        PETER_HALLO,
-                                       TestOutputWordCounter.of(PETER, WORD_HALLO,1)),
+                                       OutputWordCounter.of(PETER, WORD_HALLO,1)),
                        KeyValue.pair(
                                        KLAUS_MÜSCH,
-                                       TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
+                                       OutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
                        KeyValue.pair(
                                        PETER_WELT,
-                                       TestOutputWordCounter.of(PETER, WORD_WELT,1)),
+                                       OutputWordCounter.of(PETER, WORD_WELT,1)),
                        KeyValue.pair(
                                        KLAUS_MÜSCH,
-                                       TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
+                                       OutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
                        KeyValue.pair(
                                        KLAUS_S,
-                                       TestOutputWordCounter.of(KLAUS, WORD_S,1)),
+                                       OutputWordCounter.of(KLAUS, WORD_S,1)),
                        KeyValue.pair(
                                        PETER_BOÄH,
-                                       TestOutputWordCounter.of(PETER, WORD_BOÄH,1)),
+                                       OutputWordCounter.of(PETER, WORD_BOÄH,1)),
                        KeyValue.pair(
                                        PETER_WELT,
-                                       TestOutputWordCounter.of(PETER, WORD_WELT,2)),
+                                       OutputWordCounter.of(PETER, WORD_WELT,2)),
                        KeyValue.pair(
                                        PETER_BOÄH,
-                                       TestOutputWordCounter.of(PETER, WORD_BOÄH,2)),
+                                       OutputWordCounter.of(PETER, WORD_BOÄH,2)),
                        KeyValue.pair(
                                        KLAUS_S,
-                                       TestOutputWordCounter.of(KLAUS, WORD_S,2)),
+                                       OutputWordCounter.of(KLAUS, WORD_S,2)),
                        KeyValue.pair(
                                        PETER_BOÄH,
-                                       TestOutputWordCounter.of(PETER, WORD_BOÄH,3)),
+                                       OutputWordCounter.of(PETER, WORD_BOÄH,3)),
                        KeyValue.pair(
                                        KLAUS_S,
-                                       TestOutputWordCounter.of(KLAUS, WORD_S,3)),
+                                       OutputWordCounter.of(KLAUS, WORD_S,3)),
        };
 
-       static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
+       static MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages()
        {
-               MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
+               MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
                Stream
                                .of(EXPECTED_MESSAGES)
                                .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
index 2255b61..483baaa 100644 (file)
@@ -8,7 +8,7 @@ import lombok.NoArgsConstructor;
 @Data
 @NoArgsConstructor
 @AllArgsConstructor(staticName = "of")
-public class TestInputUser
+public class InputUser
 {
   String user;
 }
index 71ed1d9..53b6af2 100644 (file)
@@ -8,7 +8,7 @@ import lombok.NoArgsConstructor;
 @Data
 @NoArgsConstructor
 @AllArgsConstructor(staticName = "of")
-public class TestInputWord
+public class InputWord
 {
   String user;
   String word;
index cfc2cae..83288fa 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -8,7 +8,7 @@ import lombok.NoArgsConstructor;
 @Data
 @NoArgsConstructor
 @AllArgsConstructor(staticName = "of")
-public class TestOutputWord
+public class OutputWindowedWord
 {
   String user;
   String word;
index 1b59387..b5d2499 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -8,7 +8,7 @@ import lombok.NoArgsConstructor;
 @Data
 @NoArgsConstructor
 @AllArgsConstructor(staticName = "of")
-public class TestOutputWordCounter
+public class OutputWordCounter
 {
   String user;
   String word;
index 171bf63..0d0c912 100644 (file)
@@ -1,5 +1,5 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <configuration>
     <include resource="org/springframework/boot/logging/logback/base.xml" />
-    <logger name="de.juplo.kafka.wordcount.counter" level="DEBUG" />
+    <logger name="de.juplo.kafka.wordcount.popular" level="DEBUG" />
 </configuration>