import java.util.Properties;
import java.util.stream.Stream;
-import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
-import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
-import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
-import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
+import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
@Slf4j
OUT,
Stores.inMemoryKeyValueStore(STORE_NAME));
- Top10ApplicationConfiguration applicationConfiguriation =
- new Top10ApplicationConfiguration();
- Properties streamProcessorProperties =
- applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
- Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
+ Map<String, Object> propertyMap = serializationConfig();
+
+ Properties properties = new Properties();
+ properties.putAll(propertyMap);
JsonSerde<?> keySerde = new JsonSerde<>();
keySerde.configure(propertyMap, true);
JsonSerde<?> valueSerde = new JsonSerde<>();
valueSerde.configure(propertyMap, false);
- testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
+ testDriver = new TopologyTestDriver(topology, properties);
in = testDriver.createInputTopic(
IN,
MultiValueMap<User, Ranking> receivedMessages = new LinkedMultiValueMap<>();
out
.readRecordsToList()
- .forEach(record ->
- {
- log.debug(
- "OUT: {} -> {}, {}, {}",
- record.key(),
- record.value(),
- parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
- parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
- receivedMessages.add(record.key(), record.value());
- });
+ .forEach(record -> receivedMessages.add(record.key(), record.value()));
TestData.assertExpectedMessages(receivedMessages);