From cb4a5de56f142603eb63e82f6769616bf5ff058a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 17 Feb 2023 14:48:23 +0100 Subject: [PATCH] coutner: 1.2.3 - Simplified topology, using default-serdes for all steps - Defined `JsonSerde` as default for keys and values. - Configured the `JsonDeserializer` for header-based typing. - Removed the configuration of specific serdes from all steps of the processor-topology, expect the initial step, that needs a `StringSerde` for the key. --- pom.xml | 2 +- .../CounterApplicationConfiguriation.java | 15 +++++++++--- .../counter/CounterStreamProcessor.java | 23 +++---------------- 3 files changed, 16 insertions(+), 24 deletions(-) diff --git a/pom.xml b/pom.xml index 15a07c1..cbccc73 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount counter - 1.2.2 + 1.2.3 Wordcount-Counter Word-counting stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java index f1da407..1bcc834 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java @@ -3,7 +3,6 @@ package de.juplo.kafka.wordcount.counter; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; @@ -12,6 +11,9 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; +import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -31,8 +33,15 @@ public class CounterApplicationConfiguriation propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); - propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, Word.class.getPackageName()); + propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName()); + propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName()); + propertyMap.put( + JsonDeserializer.TYPE_MAPPINGS, + "W:" + Word.class.getName() + "," + + "C:" + WordCount.class.getName()); propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target"); if (properties.getCommitInterval() != null) propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index 4cc0c68..d64eb68 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -8,10 +8,8 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.kstream.*; -import org.springframework.kafka.support.serializer.JsonSerde; import java.util.Properties; @@ -48,30 +46,15 @@ public class CounterStreamProcessor KStream source = builder.stream( inputTopic, - Consumed.with( - Serdes.String(), - new JsonSerde<>(Word.class) - .ignoreTypeHeaders())); + Consumed.with(Serdes.String(), null)); source .map((key, word) -> new KeyValue<>(word, word)) - .groupByKey(Grouped.with( - new JsonSerde<>(Word.class) - .forKeys() - .noTypeInfo(), - new JsonSerde<>(Word.class) - .noTypeInfo())) + .groupByKey() .count(Materialized.as(storeSupplier)) .toStream() .map((word, count) -> new KeyValue<>(word, WordCount.of(word, count))) - .to( - outputTopic, - Produced.with( - new JsonSerde<>(Word.class) - .forKeys() - .noTypeInfo(), - new JsonSerde<>(WordCount.class) - .noTypeInfo())); + .to(outputTopic); Topology topology = builder.build(); log.info("\n\n{}", topology.describe()); -- 2.20.1