From: Kai Moritz Date: Thu, 9 May 2024 07:23:05 +0000 (+0200) Subject: WIP X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=1bfbd6c6fe3c35e0cd5470db632faf1a655487af;p=demos%2Fkafka%2Fwordcount WIP --- diff --git a/pom.xml b/pom.xml index b46a871..657330a 100644 --- a/pom.xml +++ b/pom.xml @@ -31,6 +31,10 @@ org.apache.kafka kafka-streams + + org.springframework.kafka + spring-kafka + org.springframework.boot diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 2ae3ec5..a5a2673 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -13,6 +13,9 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; +import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.stereotype.Component; import java.util.Properties; @@ -78,8 +81,13 @@ public class Top10StreamProcessor Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); + props.put(JsonDeserializer.TRUSTED_PACKAGES, Word.class.getPackageName()); + props.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName()); + props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, WordCount.class.getName()); + props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streams = new KafkaStreams(builder.build(), props); diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Word.java b/src/main/java/de/juplo/kafka/wordcount/top10/Word.java new file mode 100644 index 0000000..e4662bc --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Word.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.top10; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@NoArgsConstructor +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class Word +{ + private String user; + private String word; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/WordCount.java b/src/main/java/de/juplo/kafka/wordcount/top10/WordCount.java index 1dbbed4..69c8bed 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/WordCount.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/WordCount.java @@ -1,10 +1,14 @@ package de.juplo.kafka.wordcount.top10; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Data; +import lombok.NoArgsConstructor; +@NoArgsConstructor @Data +@JsonIgnoreProperties(ignoreUnknown = true) public class WordCount { String user;