import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@Bean(initMethod = "start", destroyMethod = "stop")
public CounterStreamProcessor streamProcessor(
CounterApplicationProperties properties,
+ KeyValueBytesStoreSupplier storeSupplier,
ObjectMapper objectMapper,
ConfigurableApplicationContext context)
{
properties.getInputTopic(),
properties.getOutputTopic(),
propertyMap,
+ storeSupplier,
objectMapper);
streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
return streamProcessor;
}
+ @Bean
+ public KeyValueBytesStoreSupplier storeSupplier()
+ {
+ return Stores.persistentKeyValueStore("counter");
+ }
+
public static void main(String[] args)
{
SpringApplication.run(CounterApplication.class, args);