X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Ftop10%2FTop10StreamProcessor.java;h=2bc1da7ddd544864909a9fc882b8b84daaeef9d7;hb=42bb92cf8fca73ddf940f2257c7fa5f693bc2ca4;hp=276ca8e7239021880d6a86cec9d27b4367daf7bc;hpb=46d16efa66a06c24aa2286b4dd14676f9168b4e2;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 276ca8e..2bc1da7 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -6,19 +6,24 @@ import de.juplo.kafka.wordcount.avro.Key; import de.juplo.kafka.wordcount.avro.Ranking; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.regex.Pattern; @@ -43,29 +48,63 @@ public class Top10StreamProcessor StreamsBuilder builder = new StreamsBuilder(); builder - .stream(properties.getInputTopic()) + .stream(properties.getInputTopic(), Consumed.with(null, Serdes.Long())) .map((key, count) -> new KeyValue<>( key.getUsername(), Entry.newBuilder().setWord(key.getWord()).setCount(count).build())) - .groupByKey() + .groupByKey(Grouped.keySerde(Serdes.String())) .aggregate( - () -> Ranking.newBuilder().build(), - (username, entry, ranking) -> { - ranking.getEntries().add(entry); + () -> Ranking.newBuilder().setEntries(new LinkedList()).build(), + (username, newEntry, ranking) -> { + List entries = new LinkedList<>(ranking.getEntries()); + + if (entries.isEmpty()) + { + entries.add(newEntry); + } + else + { + for (int i = 0; i < entries.size(); i++) + { + Entry entry = entries.get(i); + if (entry.getCount() <= newEntry.getCount()) + { + entries.add(i, newEntry); + for (int j = i + 1; j < entries.size(); j++) + { + entry = entries.get(j); + if (entry.getWord().equals(newEntry.getWord())) + { + entries.remove(j); + break; + } + } + if (entries.size() > 10) + { + entries = entries.subList(0, 10); + } + } + } + } + + ranking.setEntries(entries); return ranking; }) .toStream() - .to(properties.getOutputTopic()); + .to(properties.getOutputTopic(), Produced.keySerde(Serdes.String())); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getSchemaRegistry()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streams = new KafkaStreams(builder.build(), props); + Topology topology = builder.build(); + log.info("Topology:\n-----------------\n\n{}-----------------", topology.describe()); + + streams = new KafkaStreams(topology, props); streams.setUncaughtExceptionHandler((Throwable e) -> { log.error("Unexpected error!", e);