import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@Slf4j
public class CounterApplication
{
- @Bean(initMethod = "start", destroyMethod = "stop")
- public CounterStreamProcessor streamProcessor(
- CounterApplicationProperties properties,
- ObjectMapper objectMapper,
- ConfigurableApplicationContext context)
+ @Bean
+ public Properties propertyMap(CounterApplicationProperties properties)
{
Properties propertyMap = new Properties();
propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target");
- propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
- propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ if (properties.getCommitInterval() != null)
+ propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval());
+ if (properties.getCacheMaxBytes() != null)
+ propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, properties.getCacheMaxBytes());
propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return propertyMap;
+ }
+
+ @Bean(initMethod = "start", destroyMethod = "stop")
+ public CounterStreamProcessor streamProcessor(
+ CounterApplicationProperties properties,
+ Properties propertyMap,
+ KeyValueBytesStoreSupplier storeSupplier,
+ ObjectMapper objectMapper,
+ ConfigurableApplicationContext context)
+ {
CounterStreamProcessor streamProcessor = new CounterStreamProcessor(
properties.getInputTopic(),
properties.getOutputTopic(),
propertyMap,
+ storeSupplier,
objectMapper);
streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
return streamProcessor;
}
+ @Bean
+ public KeyValueBytesStoreSupplier storeSupplier()
+ {
+ return Stores.persistentKeyValueStore("counter");
+ }
+
public static void main(String[] args)
{
SpringApplication.run(CounterApplication.class, args);