query: 2.0.0 - (RED) The keys of the top10-topic are deserialized as JSON
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / query / QueryApplicationConfiguration.java
index 3f866f0..440d5c4 100644 (file)
@@ -1,20 +1,34 @@
 package de.juplo.kafka.wordcount.query;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.web.ServerProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 
 
 @Configuration
 @EnableConfigurationProperties(QueryApplicationProperties.class)
+@Slf4j
 public class QueryApplicationConfiguration
 {
        @Bean
@@ -41,19 +55,87 @@ public class QueryApplicationConfiguration
        }
 
        @Bean
-       public QueryStreamProcessor streamProcessor(
+       public Properties streamProcessorProperties(
                        QueryApplicationProperties applicationProperties,
+                       HostInfo applicationServer)
+       {
+               Properties props = new Properties();
+
+               props.putAll(serializationConfig());
+
+               String applicationId = applicationProperties.getApplicationId();
+               String bootstrapServer = applicationProperties.getBootstrapServer();
+
+               props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+               props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port());
+               props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
+
+               props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer());
+               if (applicationProperties.getCommitInterval() != null)
+                       props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, applicationProperties.getCommitInterval());
+               if (applicationProperties.getCacheMaxBytes() != null)
+                       props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, applicationProperties.getCacheMaxBytes());
+
+               props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+               return props;
+       }
+
+       static Properties serializationConfig()
+       {
+               Properties props = new Properties();
+
+               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+               props.put(
+                               JsonDeserializer.TYPE_MAPPINGS,
+                               "user:" + Key.class.getName() + "," +
+                               "ranking:" + Ranking.class.getName() + "," +
+                               "userranking:" + UserRanking.class.getName());
+
+               return props;
+       }
+
+       @Bean(initMethod = "start", destroyMethod = "stop")
+       public QueryStreamProcessor streamProcessor(
+                       Properties streamProcessorProperties,
                        HostInfo applicationServer,
-                       ObjectMapper mapper,
+                       QueryApplicationProperties applicationProperties,
+                       KeyValueBytesStoreSupplier userStoreSupplier,
+                       KeyValueBytesStoreSupplier rankingStoreSupplier,
                        ConfigurableApplicationContext context)
        {
-               return new QueryStreamProcessor(
-                               applicationProperties.getApplicationId(),
+               QueryStreamProcessor streamProcessor = new QueryStreamProcessor(
+                               streamProcessorProperties,
                                applicationServer,
-                               applicationProperties.getBootstrapServer(),
                                applicationProperties.getUsersInputTopic(),
                                applicationProperties.getRankingInputTopic(),
-                               mapper,
-                               context);
+                               userStoreSupplier,
+                               rankingStoreSupplier);
+
+               streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
+               {
+                       log.error("Unexpected error!", e);
+                       CompletableFuture.runAsync(() ->
+                       {
+                               log.info("Stopping application...");
+                               SpringApplication.exit(context, () -> 1);
+                       });
+                       return SHUTDOWN_CLIENT;
+               });
+
+               return streamProcessor;
+       }
+
+       @Bean
+       public KeyValueBytesStoreSupplier userStoreSupplier()
+       {
+               return Stores.persistentKeyValueStore(USER_STORE_NAME);
+       }
+
+       @Bean
+       public KeyValueBytesStoreSupplier rankingStoreSupplier()
+       {
+               return Stores.persistentKeyValueStore(RANKING_STORE_NAME);
        }
 }