KeyValueBytesStoreSupplier storeSupplier,
ObjectMapper mapper)
{
- Topology topology =
- CounterStreamProcessor.buildTopology(inputTopic, outputTopic, mapper);
+ Topology topology = CounterStreamProcessor.buildTopology(
+ inputTopic,
+ outputTopic,
+ storeSupplier,
+ mapper);
+
streams = new KafkaStreams(topology, properties);
}
static Topology buildTopology(
String inputTopic,
String outputTopic,
+ KeyValueBytesStoreSupplier storeSupplier,
ObjectMapper mapper)
{
StreamsBuilder builder = new StreamsBuilder();
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.*;
import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.util.LinkedMultiValueMap;
@Test
public void test()
{
- ObjectMapper mapper = new ObjectMapper();
- Topology topology = CounterStreamProcessor.buildTopology(IN, OUT, mapper);
+ Topology topology = CounterStreamProcessor.buildTopology(
+ IN,
+ OUT,
+ Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"),
+ new ObjectMapper());
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");