counter: 1.2.15 - Refined `TestData.assertExpectedResult(..)`
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessorTopologyTest.java
index ca2664e..5b9f365 100644 (file)
@@ -1,14 +1,18 @@
 package de.juplo.kafka.wordcount.counter;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.Test;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerde;
 import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
 
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -57,10 +61,10 @@ public class CounterStreamProcessorTopologyTest
 
     TestData.writeInputData((key, value) -> in.pipeInput(key, value));
 
-    List<KeyValue<Word, WordCounter>> receivedMessages = out
+    MultiValueMap<Word, WordCounter> receivedMessages = new LinkedMultiValueMap<>();
+    out
         .readRecordsToList()
-        .stream()
-        .map(record ->
+        .forEach(record ->
         {
           log.debug(
               "OUT: {} -> {}, {}, {}",
@@ -68,9 +72,8 @@ public class CounterStreamProcessorTopologyTest
               record.value(),
               parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
               parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
-          return KeyValue.pair(record.key(), record.value());
-        })
-        .toList();
+          receivedMessages.add(record.key(), record.value());
+        });
 
     TestData.assertExpectedResult(receivedMessages);
   }