+ return props;
+ }
+
+ static Properties serializationConfig()
+ {
+ Properties props = new Properties();
+
+ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+ props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+ props.put(JsonDeserializer.KEY_DEFAULT_TYPE, Stats.class.getName());
+ props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
+ props.put(
+ JsonDeserializer.TYPE_MAPPINGS,
+ "key:" + Key.class.getName() + "," +
+ "counter:" + Entry.class.getName() + "," +
+ "stats:" + Stats.class.getName() + "," +
+ "ranking:" + Ranking.class.getName());
+
+ return props;
+ }
+
+ @Bean(initMethod = "start", destroyMethod = "stop")
+ public Top10StreamProcessor streamProcessor(
+ Top10ApplicationProperties applicationProperties,
+ Properties streamProcessorProperties,
+ KeyValueBytesStoreSupplier storeSupplier,
+ ConfigurableApplicationContext context)
+ {
+ Top10StreamProcessor streamProcessor = new Top10StreamProcessor(
+ applicationProperties.getInputTopic(),
+ applicationProperties.getOutputTopic(),
+ streamProcessorProperties,
+ storeSupplier);
+
+ streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->