X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryApplicationConfiguration.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryApplicationConfiguration.java;h=7da171227a67dcd2472b6908435a2a8acb6bd976;hb=278c7b8125c82120e1d80fa640bd16d375d4bf86;hp=50a53645ceb860b9cdb1a8f555b2ac1fba48c623;hpb=57f47fca712981116b726e437f589ac727c5d0a7;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 50a5364..7da1712 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -1,6 +1,5 @@ package de.juplo.kafka.wordcount.query; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; @@ -14,6 +13,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; import java.io.IOException; import java.net.InetSocketAddress; @@ -78,7 +79,14 @@ public class QueryApplicationConfiguration Properties props = new Properties(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, User.class.getName()); // << Does not work without this! + props.put( + JsonDeserializer.TYPE_MAPPINGS, + "user:" + Key.class.getName() + "," + + "ranking:" + Ranking.class.getName() + "," + + "userdata:" + User.class.getName() + "," + + "userranking:" + UserRanking.class.getName()); return props; } @@ -89,7 +97,6 @@ public class QueryApplicationConfiguration HostInfo applicationServer, QueryApplicationProperties applicationProperties, KeyValueBytesStoreSupplier storeSupplier, - ObjectMapper mapper, ConfigurableApplicationContext context) { QueryStreamProcessor streamProcessor = new QueryStreamProcessor( @@ -97,8 +104,7 @@ public class QueryApplicationConfiguration applicationServer, applicationProperties.getUsersInputTopic(), applicationProperties.getRankingInputTopic(), - storeSupplier, - mapper); + storeSupplier); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> {