-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
@Configuration
-@EnableConfigurationProperties(CounterApplicationProperties.class)
+@EnableConfigurationProperties(PopularApplicationProperties.class)
@Slf4j
-public class CounterApplicationConfiguriation
+public class PopularApplicationConfiguriation
{
@Bean
public Properties streamProcessorProperties(
- CounterApplicationProperties counterProperties)
+ PopularApplicationProperties counterProperties)
{
Properties propertyMap = serializationConfig();
propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName());
+ propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, PopularApplication.class.getPackageName());
return propertyMap;
}
@Bean(initMethod = "start", destroyMethod = "stop")
- public CounterStreamProcessor streamProcessor(
- CounterApplicationProperties applicationProperties,
+ public PopularStreamProcessor streamProcessor(
+ PopularApplicationProperties applicationProperties,
Properties streamProcessorProperties,
KeyValueBytesStoreSupplier storeSupplier,
ConfigurableApplicationContext context)
{
- CounterStreamProcessor streamProcessor = new CounterStreamProcessor(
+ PopularStreamProcessor streamProcessor = new PopularStreamProcessor(
applicationProperties.getInputTopic(),
applicationProperties.getOutputTopic(),
streamProcessorProperties,