From 1681b51f04bea0e9898ed75ab7f0d0c229b7ec31 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 9 May 2024 09:23:05 +0200 Subject: [PATCH] WIP --- pom.xml | 4 ++++ .../top10/Top10ApplicationConfiguration.java | 14 ++++++++++---- .../java/de/juplo/kafka/wordcount/top10/Word.java | 15 +++++++++++++++ .../de/juplo/kafka/wordcount/top10/WordCount.java | 4 ++++ 4 files changed, 33 insertions(+), 4 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/wordcount/top10/Word.java diff --git a/pom.xml b/pom.xml index cb7ec14..6ccc687 100644 --- a/pom.xml +++ b/pom.xml @@ -31,6 +31,10 @@ org.apache.kafka kafka-streams + + org.springframework.kafka + spring-kafka + org.springframework.boot diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index 3ea85b8..7b10ccd 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -3,14 +3,15 @@ package de.juplo.kafka.wordcount.top10; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.springframework.boot.SpringApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; +import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -30,8 +31,13 @@ public class Top10ApplicationConfiguration props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); + props.put(JsonDeserializer.TRUSTED_PACKAGES, Word.class.getPackageName()); + props.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName()); + props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, WordCount.class.getName()); + props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Word.java b/src/main/java/de/juplo/kafka/wordcount/top10/Word.java new file mode 100644 index 0000000..e4662bc --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Word.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.top10; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@NoArgsConstructor +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class Word +{ + private String user; + private String word; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/WordCount.java b/src/main/java/de/juplo/kafka/wordcount/top10/WordCount.java index 1dbbed4..69c8bed 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/WordCount.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/WordCount.java @@ -1,10 +1,14 @@ package de.juplo.kafka.wordcount.top10; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Data; +import lombok.NoArgsConstructor; +@NoArgsConstructor @Data +@JsonIgnoreProperties(ignoreUnknown = true) public class WordCount { String user; -- 2.20.1