- @Bean
- public Properties propertyMap(CounterApplicationProperties properties)
- {
- Properties propertyMap = new Properties();
-
- propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
- propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
- propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
- propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
- propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target");
- if (properties.getCommitInterval() != null)
- propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval());
- if (properties.getCacheMaxBytes() != null)
- propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, properties.getCacheMaxBytes());
- propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
- return propertyMap;
- }
-
- @Bean(initMethod = "start", destroyMethod = "stop")
- public CounterStreamProcessor streamProcessor(
- CounterApplicationProperties properties,
- Properties propertyMap,
- KeyValueBytesStoreSupplier storeSupplier,
- ObjectMapper objectMapper,
- ConfigurableApplicationContext context)
- {
- CounterStreamProcessor streamProcessor = new CounterStreamProcessor(
- properties.getInputTopic(),
- properties.getOutputTopic(),
- propertyMap,
- storeSupplier,
- objectMapper);
-
- streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
- {
- log.error("Unexpected error!", e);
- CompletableFuture.runAsync(() ->
- {
- log.info("Stopping application...");
- SpringApplication.exit(context, () -> 1);
- });
- return SHUTDOWN_CLIENT;
- });
-
-
- return streamProcessor;
- }
-
- @Bean
- public KeyValueBytesStoreSupplier storeSupplier()
- {
- return Stores.persistentKeyValueStore("counter");
- }
-