import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.List;
import java.util.Properties;
OUT,
Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
- CounterApplicationConfiguriation config =
+ CounterApplicationConfiguriation applicationConfiguriation =
new CounterApplicationConfiguriation();
- Properties properties =
- config.propertyMap(new CounterApplicationProperties());
+ Properties streamProcessorProperties =
+ applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
- TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
+ TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
- TestInputTopic<String, String> in = testDriver.createInputTopic(
+ TestInputTopic<String, Word> in = testDriver.createInputTopic(
IN,
- new StringSerializer(),
- new StringSerializer());
+ new JsonSerializer<>(),
+ new JsonSerializer<>());
TestOutputTopic<String, String> out = testDriver.createOutputTopic(
OUT,