counter: Refined assertions in `CounterApplicationIT` counter
authorKai Moritz <kai@juplo.de>
Sun, 26 May 2024 20:59:26 +0000 (22:59 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 27 May 2024 18:35:56 +0000 (20:35 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java

index b664814..e435d0b 100644 (file)
@@ -1,8 +1,6 @@
 package de.juplo.kafka.wordcount.counter;
 
 import lombok.extern.slf4j.Slf4j;
 package de.juplo.kafka.wordcount.counter;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.BeforeEach;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.BeforeEach;
@@ -14,18 +12,18 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Primary;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.context.annotation.Primary;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
 
 import java.time.Duration;
 
 import java.time.Duration;
-import java.util.LinkedList;
-import java.util.List;
 
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
 
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
-import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
 import static org.awaitility.Awaitility.await;
 import static org.awaitility.Awaitility.await;
-import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
-import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
 
 
 @SpringBootTest(
 
 
 @SpringBootTest(
@@ -79,21 +77,18 @@ public class CounterApplicationIT
 
        static class Consumer
        {
 
        static class Consumer
        {
-               private final List<KeyValue<Word, WordCounter>> received = new LinkedList<>();
+               private final MultiValueMap<Word, WordCounter> received = new LinkedMultiValueMap<>();
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
-               public synchronized void receive(ConsumerRecord<Word, WordCounter> record)
+               public synchronized void receive(
+                               @Header(KafkaHeaders.RECEIVED_KEY) Word word,
+                               @Payload WordCounter counter)
                {
                {
-                       log.debug(
-                                       "Received message: {} -> {}, key: {}, value: {}",
-                                       record.key(),
-                                       record.value(),
-                                       parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
-                                       parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
-                       received.add(KeyValue.pair(record.key(),record.value()));
+                       log.debug("Received message: {} -> {}", word, counter);
+                       received.add(word, counter);
                }
 
                }
 
-               synchronized List<KeyValue<Word, WordCounter>> getReceivedMessages()
+               synchronized MultiValueMap<Word, WordCounter> getReceivedMessages()
                {
                        return received;
                }
                {
                        return received;
                }
index ca2664e..5b9f365 100644 (file)
@@ -1,14 +1,18 @@
 package de.juplo.kafka.wordcount.counter;
 
 import lombok.extern.slf4j.Slf4j;
 package de.juplo.kafka.wordcount.counter;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.Test;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerde;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.Test;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerde;
 import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
 
 
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
 import java.util.Map;
 import java.util.Properties;
 
@@ -57,10 +61,10 @@ public class CounterStreamProcessorTopologyTest
 
     TestData.writeInputData((key, value) -> in.pipeInput(key, value));
 
 
     TestData.writeInputData((key, value) -> in.pipeInput(key, value));
 
-    List<KeyValue<Word, WordCounter>> receivedMessages = out
+    MultiValueMap<Word, WordCounter> receivedMessages = new LinkedMultiValueMap<>();
+    out
         .readRecordsToList()
         .readRecordsToList()
-        .stream()
-        .map(record ->
+        .forEach(record ->
         {
           log.debug(
               "OUT: {} -> {}, {}, {}",
         {
           log.debug(
               "OUT: {} -> {}, {}, {}",
@@ -68,9 +72,8 @@ public class CounterStreamProcessorTopologyTest
               record.value(),
               parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
               parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
               record.value(),
               parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
               parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
-          return KeyValue.pair(record.key(), record.value());
-        })
-        .toList();
+          receivedMessages.add(record.key(), record.value());
+        });
 
     TestData.assertExpectedResult(receivedMessages);
   }
 
     TestData.assertExpectedResult(receivedMessages);
   }
index 19443ac..c71ad94 100644 (file)
@@ -3,14 +3,19 @@ package de.juplo.kafka.wordcount.counter;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.KeyValue;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
 
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
 
 
 class TestData
 
 
 class TestData
@@ -52,29 +57,17 @@ class TestData
                                Word.of("klaus","s"));
        }
 
                                Word.of("klaus","s"));
        }
 
-       static void assertExpectedResult(List<KeyValue<Word, WordCounter>> receivedMessages)
+       static void assertExpectedResult(MultiValueMap<Word, WordCounter> receivedMessages)
        {
        {
-               assertThat(receivedMessages).hasSize(11);
-               assertThat(receivedMessages).containsSubsequence(
-                               expectedMessages[0]); // Hallo
-               assertThat(receivedMessages).containsSubsequence(
-                               expectedMessages[1],
-                               expectedMessages[3]); // Müsch
-               assertThat(receivedMessages).containsSubsequence(
-                               expectedMessages[2],
-                               expectedMessages[6]);
-               assertThat(receivedMessages).containsSubsequence(
-                               expectedMessages[4],
-                               expectedMessages[8],
-                               expectedMessages[10]); // s
-               assertThat(receivedMessages).containsSubsequence(
-                               expectedMessages[5],
-                               expectedMessages[7],
-                               expectedMessages[9]); // Boäh
+               MultiValueMap<Word, WordCounter> expected = new LinkedMultiValueMap<>();
+               expectedMessages.forEach(keyValue -> expected.add(keyValue.key, keyValue.value));
+               await("Received expected messages")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> expected.forEach((word, counter) ->
+                                               assertThat(receivedMessages.get(word)).containsExactlyElementsOf(counter)));
        }
 
        }
 
-       static KeyValue<Word, WordCounter>[] expectedMessages = new KeyValue[]
-       {
+       static Stream<KeyValue<Word, WordCounter>> expectedMessages = Stream.of(
                        KeyValue.pair(
                                        Word.of("peter","Hallo"),
                                        WordCounter.of("peter","Hallo",1)),
                        KeyValue.pair(
                                        Word.of("peter","Hallo"),
                                        WordCounter.of("peter","Hallo",1)),
@@ -107,8 +100,7 @@ class TestData
                                        WordCounter.of("peter","Boäh",3)),
                        KeyValue.pair(
                                        Word.of("klaus","s"),
                                        WordCounter.of("peter","Boäh",3)),
                        KeyValue.pair(
                                        Word.of("klaus","s"),
-                                       WordCounter.of("klaus","s",3)),
-       };
+                                       WordCounter.of("klaus","s",3)));
 
        static Map<String, Object> convertToMap(Properties properties)
        {
 
        static Map<String, Object> convertToMap(Properties properties)
        {