top10: 1.2.1 - Removed logging of type-headers in tests
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessorTopologyTest.java
index 1becd65..84cfb1e 100644 (file)
@@ -20,10 +20,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Stream;
 
-import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
-import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
-import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
-import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
+import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
 
 
 @Slf4j
@@ -47,18 +44,17 @@ public class Top10StreamProcessorTopologyTest
         OUT,
         Stores.inMemoryKeyValueStore(STORE_NAME));
 
-    Top10ApplicationConfiguration applicationConfiguriation =
-        new Top10ApplicationConfiguration();
-    Properties streamProcessorProperties =
-        applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
-    Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
+    Map<String, Object> propertyMap = serializationConfig();
+
+    Properties properties = new Properties();
+    properties.putAll(propertyMap);
 
     JsonSerde<?> keySerde = new JsonSerde<>();
     keySerde.configure(propertyMap, true);
     JsonSerde<?> valueSerde = new JsonSerde<>();
     valueSerde.configure(propertyMap, false);
 
-    testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
+    testDriver = new TopologyTestDriver(topology, properties);
 
     in = testDriver.createInputTopic(
         IN,
@@ -85,19 +81,13 @@ public class Top10StreamProcessorTopologyTest
     MultiValueMap<User, Ranking> receivedMessages = new LinkedMultiValueMap<>();
     out
         .readRecordsToList()
-        .forEach(record ->
-        {
-          log.debug(
-              "OUT: {} -> {}, {}, {}",
-              record.key(),
-              record.value(),
-              parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
-              parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
-          receivedMessages.add(record.key(), record.value());
-        });
+        .forEach(record -> receivedMessages.add(record.key(), record.value()));
 
     TestData.assertExpectedMessages(receivedMessages);
 
+    TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages);
+    TestData.assertExpectedLastMessagesForUsers(receivedMessages);
+
     KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
     TestData.assertExpectedState(store);
   }