From 7e0f4ca6927d9b62909b0789b4b14a46f8dbd0f5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 5 May 2024 14:55:17 +0200 Subject: [PATCH] top10: 1.1.0 - Simplified topology, using JsonSerde - Defined `JsonSerde` as default for keys and values. - Removed the configuration of specific serdes from all steps of the processor-topology. - Added type-mappings for serialization/deserialization. --- pom.xml | 6 +- .../de/juplo/kafka/wordcount/top10/Entry.java | 17 ++++-- .../de/juplo/kafka/wordcount/top10/Key.java | 14 +++-- .../juplo/kafka/wordcount/top10/Ranking.java | 23 +++++--- .../top10/Top10ApplicationConfiguration.java | 23 +++++--- .../wordcount/top10/Top10StreamProcessor.java | 55 +++++++------------ 6 files changed, 78 insertions(+), 60 deletions(-) diff --git a/pom.xml b/pom.xml index cb7ec14..02cf701 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount top10 - 1.0.3 + 1.1.0 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example @@ -31,6 +31,10 @@ org.apache.kafka kafka-streams + + org.springframework.kafka + spring-kafka + org.springframework.boot diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java index 67f45f2..b25fc07 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java @@ -1,11 +1,20 @@ package de.juplo.kafka.wordcount.top10; -import lombok.Value; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; -@Value(staticConstructor = "of") +@NoArgsConstructor +@AllArgsConstructor( + staticName = "of", + access = AccessLevel.PACKAGE) +@Data +@JsonIgnoreProperties(ignoreUnknown = true) public class Entry { - private final String word; - private final Long count; + private String word; + private Long counter; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java index d09dbcc..ffac8ea 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java @@ -1,13 +1,17 @@ package de.juplo.kafka.wordcount.top10; -import lombok.Getter; -import lombok.Setter; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.*; -@Getter -@Setter +@NoArgsConstructor +@AllArgsConstructor( + staticName = "of", + access = AccessLevel.PACKAGE) +@Data +@JsonIgnoreProperties(ignoreUnknown = true) public class Key { - private String username; + private String user; private String word; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java index b748fe5..80e8742 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java @@ -1,26 +1,26 @@ package de.juplo.kafka.wordcount.top10; -import lombok.Getter; -import lombok.Setter; +import lombok.*; import java.util.Arrays; import java.util.LinkedList; import java.util.List; -@Getter -@Setter +@AllArgsConstructor(access = AccessLevel.PRIVATE) +@NoArgsConstructor +@Data public class Ranking { private Entry[] entries = new Entry[0]; - public void add(Entry newEntry) + public Ranking add(Entry newEntry) { if (entries.length == 0) { entries = new Entry[1]; entries[0] = newEntry; - return; + return this; } List list = new LinkedList<>(Arrays.asList(entries)); @@ -29,7 +29,7 @@ public class Ranking Entry entry; entry = list.get(i); - if (entry.getCount() <= newEntry.getCount()) + if (entry.getCounter() <= newEntry.getCounter()) { list.add(i, newEntry); for (int j = i+1; j < list.size(); j++) @@ -46,8 +46,15 @@ public class Ranking list = list.subList(0,10); } entries = list.toArray(num -> new Entry[num]); - return; + return this; } } + + return this; + } + + public static Ranking of(Entry... entries) + { + return new Ranking(entries); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index 3ea85b8..7749917 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -1,16 +1,16 @@ package de.juplo.kafka.wordcount.top10; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.springframework.boot.SpringApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; +import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -30,8 +30,19 @@ public class Top10ApplicationConfiguration props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName()); + props.put(JsonDeserializer.KEY_DEFAULT_TYPE, String.class.getName()); + props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName()); + props.put( + JsonDeserializer.TYPE_MAPPINGS, + "word:" + Key.class.getName() + "," + + "counter:" + Entry.class.getName()); + props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE); + props.put( + JsonSerializer.TYPE_MAPPINGS, + "ranking:" + Ranking.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; @@ -40,14 +51,12 @@ public class Top10ApplicationConfiguration @Bean(initMethod = "start", destroyMethod = "stop") public Top10StreamProcessor streamProcessor( Top10ApplicationProperties applicationProperties, - ObjectMapper objectMapper, Properties streamProcessorProperties, ConfigurableApplicationContext context) { Top10StreamProcessor streamProcessor = new Top10StreamProcessor( applicationProperties.getInputTopic(), applicationProperties.getOutputTopic(), - objectMapper, streamProcessorProperties); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index f0a7d19..a3900bf 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -1,11 +1,10 @@ package de.juplo.kafka.wordcount.top10; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; import java.util.Properties; @@ -19,49 +18,35 @@ public class Top10StreamProcessor public Top10StreamProcessor( String inputTopic, String outputTopic, - ObjectMapper mapper, Properties props) + { + Topology topology = Top10StreamProcessor.buildTopology( + inputTopic, + outputTopic); + + streams = new KafkaStreams(topology, props); + } + + static Topology buildTopology( + String inputTopic, + String outputTopic) { StreamsBuilder builder = new StreamsBuilder(); builder - .stream(inputTopic) - .map((keyJson, countStr) -> - { - try - { - Key key = mapper.readValue(keyJson, Key.class); - Long count = Long.parseLong(countStr); - Entry entry = Entry.of(key.getWord(), count); - String entryJson = mapper.writeValueAsString(entry); - return new KeyValue<>(key.getUsername(), entryJson); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }) + .stream(inputTopic) + .map((key, entry) -> new KeyValue<>(key.getUser(), entry)) .groupByKey() .aggregate( - () -> "{\"entries\" : []}", - (username, entryJson, rankingJson) -> - { - try - { - Ranking ranking = mapper.readValue(rankingJson, Ranking.class); - ranking.add(mapper.readValue(entryJson, Entry.class)); - return mapper.writeValueAsString(ranking); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - } - ) + () -> new Ranking(), + (user, entry, ranking) -> ranking.add(entry)) .toStream() .to(outputTopic); - streams = new KafkaStreams(builder.build(), props); + Topology topology = builder.build(); + log.info("\n\n{}", topology.describe()); + + return topology; } public void start() -- 2.20.1