counter: 1.2.15 - Refined `TestData.assertExpectedResult(..)`
authorKai Moritz <kai@juplo.de>
Sun, 26 May 2024 20:59:26 +0000 (22:59 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 8 Jun 2024 11:33:30 +0000 (13:33 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java

index 992164c..57ac365 100644 (file)
@@ -1,8 +1,6 @@
 package de.juplo.kafka.wordcount.counter;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.BeforeEach;
@@ -14,18 +12,18 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Primary;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
 import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
 
 import java.time.Duration;
-import java.util.LinkedList;
-import java.util.List;
 
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
-import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
 import static org.awaitility.Awaitility.await;
-import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
-import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
 
 
 @SpringBootTest(
@@ -78,21 +76,18 @@ public class CounterApplicationIT
 
        static class Consumer
        {
-               private final List<KeyValue<Word, WordCounter>> received = new LinkedList<>();
+               private final MultiValueMap<Word, WordCounter> received = new LinkedMultiValueMap<>();
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
-               public synchronized void receive(ConsumerRecord<Word, WordCounter> record)
+               public synchronized void receive(
+                               @Header(KafkaHeaders.RECEIVED_KEY) Word word,
+                               @Payload WordCounter counter)
                {
-                       log.debug(
-                                       "Received message: {} -> {}, key: {}, value: {}",
-                                       record.key(),
-                                       record.value(),
-                                       parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
-                                       parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
-                       received.add(KeyValue.pair(record.key(),record.value()));
+                       log.debug("Received message: {} -> {}", word, counter);
+                       received.add(word, counter);
                }
 
-               synchronized List<KeyValue<Word, WordCounter>> getReceivedMessages()
+               synchronized MultiValueMap<Word, WordCounter> getReceivedMessages()
                {
                        return received;
                }
index ca2664e..5b9f365 100644 (file)
@@ -1,14 +1,18 @@
 package de.juplo.kafka.wordcount.counter;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.Test;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerde;
 import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
 
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -57,10 +61,10 @@ public class CounterStreamProcessorTopologyTest
 
     TestData.writeInputData((key, value) -> in.pipeInput(key, value));
 
-    List<KeyValue<Word, WordCounter>> receivedMessages = out
+    MultiValueMap<Word, WordCounter> receivedMessages = new LinkedMultiValueMap<>();
+    out
         .readRecordsToList()
-        .stream()
-        .map(record ->
+        .forEach(record ->
         {
           log.debug(
               "OUT: {} -> {}, {}, {}",
@@ -68,9 +72,8 @@ public class CounterStreamProcessorTopologyTest
               record.value(),
               parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
               parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
-          return KeyValue.pair(record.key(), record.value());
-        })
-        .toList();
+          receivedMessages.add(record.key(), record.value());
+        });
 
     TestData.assertExpectedResult(receivedMessages);
   }
index 19443ac..1687a33 100644 (file)
@@ -3,12 +3,14 @@ package de.juplo.kafka.wordcount.counter;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.KeyValue;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
 
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -52,28 +54,15 @@ class TestData
                                Word.of("klaus","s"));
        }
 
-       static void assertExpectedResult(List<KeyValue<Word, WordCounter>> receivedMessages)
+       static void assertExpectedResult(MultiValueMap<Word, WordCounter> receivedMessages)
        {
-               assertThat(receivedMessages).hasSize(11);
-               assertThat(receivedMessages).containsSubsequence(
-                               expectedMessages[0]); // Hallo
-               assertThat(receivedMessages).containsSubsequence(
-                               expectedMessages[1],
-                               expectedMessages[3]); // Müsch
-               assertThat(receivedMessages).containsSubsequence(
-                               expectedMessages[2],
-                               expectedMessages[6]);
-               assertThat(receivedMessages).containsSubsequence(
-                               expectedMessages[4],
-                               expectedMessages[8],
-                               expectedMessages[10]); // s
-               assertThat(receivedMessages).containsSubsequence(
-                               expectedMessages[5],
-                               expectedMessages[7],
-                               expectedMessages[9]); // Boäh
+               expectedMessages.forEach(
+                               (word, counter) ->
+                                               assertThat(receivedMessages.get(word))
+                                                               .containsExactlyElementsOf(counter));
        }
 
-       static KeyValue<Word, WordCounter>[] expectedMessages = new KeyValue[]
+       static KeyValue<Word, WordCounter>[] expectedMessagesArray = new KeyValue[]
        {
                        KeyValue.pair(
                                        Word.of("peter","Hallo"),
@@ -110,6 +99,15 @@ class TestData
                                        WordCounter.of("klaus","s",3)),
        };
 
+       static MultiValueMap<Word, WordCounter> expectedMessages;
+       static
+       {
+               expectedMessages = new LinkedMultiValueMap<>();
+               Stream
+                               .of(expectedMessagesArray)
+                               .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
+       }
+
        static Map<String, Object> convertToMap(Properties properties)
        {
                return properties