counter: 1.2.15 - Separated serialization-config into a static method
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / CounterApplicationIT.java
index ad4faf2..025a160 100644 (file)
@@ -1,5 +1,8 @@
 package de.juplo.kafka.wordcount.counter;
 
+import de.juplo.kafka.wordcount.splitter.TestInputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
@@ -20,7 +23,6 @@ import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
 import java.time.Duration;
-import java.util.stream.Stream;
 
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
@@ -35,8 +37,8 @@ import static org.awaitility.Awaitility.await;
                                "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.properties.spring.json.use.type.headers=false",
-                               "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.counter.Word",
-                               "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.WordCounter",
+                               "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.top10.TestOutputWord",
+                               "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
                                "logging.level.root=WARN",
                                "logging.level.de.juplo=DEBUG",
                                "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
@@ -52,7 +54,7 @@ public class CounterApplicationIT
        public static final String TOPIC_OUT = "out";
 
        @Autowired
-       KafkaTemplate<String, Word> kafkaTemplate;
+       KafkaTemplate<String, TestInputWord> kafkaTemplate;
        @Autowired
        Consumer consumer;
 
@@ -67,8 +69,8 @@ public class CounterApplicationIT
        @Test
        void testSendMessage()
        {
-               Stream
-                               .of(TestData.INPUT_MESSAGES)
+               TestData
+                               .getInputMessages()
                                .forEach(word -> kafkaTemplate.send(TOPIC_IN, word.getUser(), word));
 
                await("Expected messages")
@@ -79,18 +81,18 @@ public class CounterApplicationIT
 
        static class Consumer
        {
-               private final MultiValueMap<Word, WordCounter> received = new LinkedMultiValueMap<>();
+               private final MultiValueMap<TestOutputWord, TestOutputWordCounter> received = new LinkedMultiValueMap<>();
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
                public synchronized void receive(
-                               @Header(KafkaHeaders.RECEIVED_KEY) Word word,
-                               @Payload WordCounter counter)
+                               @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word,
+                               @Payload TestOutputWordCounter counter)
                {
                        log.debug("Received message: {} -> {}", word, counter);
                        received.add(word, counter);
                }
 
-               synchronized MultiValueMap<Word, WordCounter> getReceivedMessages()
+               synchronized MultiValueMap<TestOutputWord, TestOutputWordCounter> getReceivedMessages()
                {
                        return received;
                }