X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Ftop10%2FTop10StreamProcessor.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Ftop10%2FTop10StreamProcessor.java;h=811cf987147855fd443298e30a731a6c8210b48a;hb=37e0703a25b489b0258f6ef019c57e82863394e4;hp=aa21c4abbefd0c814e0f9b4c13fd5ea97bc68601;hpb=5edc5e44656c9ec5b8cc75ad22dec6ddf1a2a7c4;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index aa21c4a..811cf98 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -10,11 +10,9 @@ import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.springframework.boot.SpringApplication; @@ -23,6 +21,7 @@ import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.util.LinkedList; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.regex.Pattern; @@ -51,14 +50,13 @@ public class Top10StreamProcessor .map((key, count) -> new KeyValue<>( key.getUsername(), Entry.newBuilder().setWord(key.getWord()).setCount(count).build())) - .groupByKey() + .groupByKey(Grouped.keySerde(Serdes.String())) .aggregate( - () -> Ranking.newBuilder().build(), + () -> Ranking.newBuilder().setEntries(new LinkedList()).build(), (username, entry, ranking) -> { ranking.getEntries().add(entry); return ranking; - }, - Materialized.with(Serdes.String(), null)) + }) .toStream() .to(properties.getOutputTopic(), Produced.keySerde(Serdes.String())); @@ -70,7 +68,10 @@ public class Top10StreamProcessor props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getSchemaRegistry()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streams = new KafkaStreams(builder.build(), props); + Topology topology = builder.build(); + log.info("Topology:\n-----------------\n\n{}-----------------", topology.describe()); + + streams = new KafkaStreams(topology, props); streams.setUncaughtExceptionHandler((Throwable e) -> { log.error("Unexpected error!", e);