import java.util.Properties;
import java.util.stream.Stream;
-import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
+import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
OUT,
Stores.inMemoryKeyValueStore(STORE_NAME));
- Top10ApplicationConfiguration applicationConfiguriation =
- new Top10ApplicationConfiguration();
- Properties streamProcessorProperties =
- applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
- Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
+ Map<String, Object> propertyMap = serializationConfig();
+
+ Properties properties = new Properties();
+ properties.putAll(propertyMap);
JsonSerde<?> keySerde = new JsonSerde<>();
keySerde.configure(propertyMap, true);
JsonSerde<?> valueSerde = new JsonSerde<>();
valueSerde.configure(propertyMap, false);
- testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
+ testDriver = new TopologyTestDriver(topology, properties);
in = testDriver.createInputTopic(
IN,
TestData.assertExpectedMessages(receivedMessages);
+ TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages);
TestData.assertExpectedLastMessagesForUsers(receivedMessages);
KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);