popular: 1.0.0 - Renamed packages and classes -- ALIGN
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / popular / PopularApplicationIT.java
index 0faa2de..a327389 100644 (file)
@@ -1,9 +1,9 @@
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
 
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import de.juplo.kafka.wordcount.splitter.InputUser;
+import de.juplo.kafka.wordcount.splitter.InputWord;
+import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
+import de.juplo.kafka.wordcount.stats.OutputWordCounter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
@@ -26,9 +26,9 @@ import org.springframework.util.MultiValueMap;
 
 import java.time.Duration;
 
-import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
-import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_IN;
+import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_OUT;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
 import static org.awaitility.Awaitility.await;
 
 
@@ -41,17 +41,17 @@ import static org.awaitility.Awaitility.await;
                                "spring.kafka.consumer.auto-offset-reset=earliest",
                                "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
-                               "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
+                               "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.stats.OutputWindowedWord,counter:de.juplo.kafka.wordcount.stats.OutputWordCounter",
                                "logging.level.root=WARN",
                                "logging.level.de.juplo=DEBUG",
-                               "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
-                               "juplo.wordcount.counter.commit-interval=100",
-                               "juplo.wordcount.counter.cache-max-bytes=0",
-                               "juplo.wordcount.counter.input-topic=" + TOPIC_IN,
-                               "juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
+                               "juplo.wordcount.popular.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "juplo.wordcount.popular.commit-interval=100",
+                               "juplo.wordcount.popular.cache-max-bytes=0",
+                               "juplo.wordcount.popular.input-topic=" + TOPIC_IN,
+                               "juplo.wordcount.popular.output-topic=" + TOPIC_OUT })
 @EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
 @Slf4j
-public class CounterApplicationIT
+public class PopularApplicationIT
 {
        public static final String TOPIC_IN = "in";
        public static final String TOPIC_OUT = "out";
@@ -59,12 +59,12 @@ public class CounterApplicationIT
        @Autowired
        Consumer consumer;
        @Autowired
-       CounterStreamProcessor streamProcessor;
+       PopularStreamProcessor streamProcessor;
 
 
        @BeforeAll
        public static void testSendMessage(
-                       @Autowired KafkaTemplate<TestInputUser, TestInputWord> kafkaTemplate)
+                       @Autowired KafkaTemplate<InputUser, InputWord> kafkaTemplate)
        {
                TestData
                                .getInputMessages()
@@ -72,7 +72,7 @@ public class CounterApplicationIT
                                {
                                        try
                                        {
-                                               SendResult<TestInputUser, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+                                               SendResult<InputUser, InputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
                                                log.info(
                                                                "Sent: {}={}, partition={}, offset={}",
                                                                result.getProducerRecord().key(),
@@ -129,19 +129,19 @@ public class CounterApplicationIT
 
        static class Consumer
        {
-               private final MultiValueMap<TestOutputWord, TestOutputWordCounter> received = new LinkedMultiValueMap<>();
+               private final MultiValueMap<OutputWindowedWord, OutputWordCounter> received = new LinkedMultiValueMap<>();
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
                public synchronized void receive(
-                               @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word,
-                               @Payload TestOutputWordCounter counter)
+                               @Header(KafkaHeaders.RECEIVED_KEY) OutputWindowedWord word,
+                               @Payload OutputWordCounter counter)
                {
                        log.debug("Received message: {} -> {}", word, counter);
                        received.add(word, counter);
                }
 
                synchronized void enforceAssertion(
-                               java.util.function.Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> assertion)
+                               java.util.function.Consumer<MultiValueMap<OutputWindowedWord, OutputWordCounter>> assertion)
                {
                        assertion.accept(received);
                }