package de.juplo.kafka.wordcount.counter;
+import de.juplo.kafka.wordcount.splitter.TestInputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
-import java.util.Map;
-import java.util.Properties;
-
-import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
-import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
-import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
-import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
+import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
@Slf4j
TopologyTestDriver testDriver;
- TestInputTopic<String, Word> in;
- TestOutputTopic<Word, WordCounter> out;
+ TestInputTopic<String, TestInputWord> in;
+ TestOutputTopic<TestOutputWord, TestOutputWordCounter> out;
@BeforeEach
OUT,
Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
- CounterApplicationConfiguriation applicationConfiguriation =
- new CounterApplicationConfiguriation();
- Properties streamProcessorProperties =
- applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
- Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
-
- JsonSerde<?> keySerde = new JsonSerde<>();
- keySerde.configure(propertyMap, true);
- JsonSerde<?> valueSerde = new JsonSerde<>();
- valueSerde.configure(propertyMap, false);
-
- testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
+ testDriver = new TopologyTestDriver(topology, serializationConfig());
in = testDriver.createInputTopic(
IN,
- (JsonSerializer<String>)keySerde.serializer(),
- (JsonSerializer<Word>)valueSerde.serializer());
+ new StringSerializer(),
+ new JsonSerializer().noTypeInfo());
out = testDriver.createOutputTopic(
OUT,
- (JsonDeserializer<Word>)keySerde.deserializer(),
- (JsonDeserializer<WordCounter>)valueSerde.deserializer());
+ new JsonDeserializer()
+ .copyWithType(TestOutputWord.class)
+ .ignoreTypeHeaders(),
+ new JsonDeserializer()
+ .copyWithType(TestOutputWordCounter.class)
+ .ignoreTypeHeaders());
}
@Test
public void test()
{
- TestData.injectInputMessages((key, value) -> in.pipeInput(key, value));
+ TestData
+ .getInputMessages()
+ .forEach(kv -> in.pipeInput(kv.key, kv.value));
- MultiValueMap<Word, WordCounter> receivedMessages = new LinkedMultiValueMap<>();
+ MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
out
.readRecordsToList()
- .forEach(record ->
- {
- log.debug(
- "OUT: {} -> {}, {}, {}",
- record.key(),
- record.value(),
- parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
- parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
- receivedMessages.add(record.key(), record.value());
- });
+ .forEach(record -> receivedMessages.add(record.key(), record.value()));
TestData.assertExpectedMessages(receivedMessages);
}