X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryApplicationConfiguration.java;h=3bf83269854c793be79d351c2aaa1864d7f202e3;hb=fc5d6c6ee08a4b2e29a045bf4071dd0a4d86bc0d;hp=50a53645ceb860b9cdb1a8f555b2ac1fba48c623;hpb=030c0db664cea0ec3e4307065054c449dbca6be2;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 50a5364..3bf8326 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -1,6 +1,5 @@ package de.juplo.kafka.wordcount.query; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; @@ -14,6 +13,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; import java.io.IOException; import java.net.InetSocketAddress; @@ -21,7 +22,8 @@ import java.net.Socket; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @@ -78,7 +80,13 @@ public class QueryApplicationConfiguration Properties props = new Properties(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put( + JsonDeserializer.TYPE_MAPPINGS, + "user:" + Key.class.getName() + "," + + "ranking:" + Ranking.class.getName() + "," + + "userdata:" + User.class.getName() + "," + + "userranking:" + UserRanking.class.getName()); return props; } @@ -88,8 +96,8 @@ public class QueryApplicationConfiguration Properties streamProcessorProperties, HostInfo applicationServer, QueryApplicationProperties applicationProperties, - KeyValueBytesStoreSupplier storeSupplier, - ObjectMapper mapper, + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier, ConfigurableApplicationContext context) { QueryStreamProcessor streamProcessor = new QueryStreamProcessor( @@ -97,8 +105,8 @@ public class QueryApplicationConfiguration applicationServer, applicationProperties.getUsersInputTopic(), applicationProperties.getRankingInputTopic(), - storeSupplier, - mapper); + userStoreSupplier, + rankingStoreSupplier); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> { @@ -115,8 +123,14 @@ public class QueryApplicationConfiguration } @Bean - public KeyValueBytesStoreSupplier storeSupplier() + public KeyValueBytesStoreSupplier userStoreSupplier() { - return Stores.persistentKeyValueStore(STORE_NAME); + return Stores.persistentKeyValueStore(USER_STORE_NAME); + } + + @Bean + public KeyValueBytesStoreSupplier rankingStoreSupplier() + { + return Stores.persistentKeyValueStore(RANKING_STORE_NAME); } }