import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerde;
+import java.time.Duration;
+import java.time.ZoneId;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.KEY_VALUE_STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_STORE_NAME;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
public PopularStreamProcessor streamProcessor(
PopularApplicationProperties applicationProperties,
Properties streamProcessorProperties,
- KeyValueBytesStoreSupplier storeSupplier,
+ ZoneId zone,
+ KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
+ WindowBytesStoreSupplier windowBytesStoreSupplier,
ConfigurableApplicationContext context)
{
PopularStreamProcessor streamProcessor = new PopularStreamProcessor(
applicationProperties.getInputTopic(),
applicationProperties.getOutputTopic(),
streamProcessorProperties,
- storeSupplier);
+ zone,
+ windowBytesStoreSupplier,
+ keyValueBytesStoreSupplier);
streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
{
}
@Bean
- public KeyValueBytesStoreSupplier storeSupplier()
+ public ZoneId defaultZone()
{
- return Stores.persistentKeyValueStore(STORE_NAME);
+ return ZoneId.systemDefault();
+ }
+
+ @Bean
+ public WindowBytesStoreSupplier windowBytesStoreSupplier()
+ {
+ return Stores.persistentWindowStore(
+ KEY_VALUE_STORE_NAME,
+ Duration.ofSeconds(60),
+ Duration.ofSeconds(30),
+ true);
+ }
+
+ @Bean
+ public KeyValueBytesStoreSupplier keyValueBytesStoreSupplier()
+ {
+ return Stores.persistentKeyValueStore(WINDOW_STORE_NAME);
}
}