From: Kai Moritz Date: Sat, 4 Sep 2021 15:20:44 +0000 (+0200) Subject: top10:1.0.0 - ranks first 10 mostly used words, keyed by user X-Git-Tag: top10-1.0.0 X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fwordcount;a=commitdiff_plain;h=4b94d31fbd663cb277276def106be9873ec4a246 top10:1.0.0 - ranks first 10 mostly used words, keyed by user --- diff --git a/pom.xml b/pom.xml index 71115c0..9bda638 100644 --- a/pom.xml +++ b/pom.xml @@ -9,10 +9,10 @@ de.juplo.kafka.wordcount - counter + top10 1.0.0 - Wordcount-Counter - Word-counting stream-processor of the multi-user wordcount-example + Wordcount-Top-10 + Top-10 stream-processor of the multi-user wordcount-example 0.33.0 11 diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java deleted file mode 100644 index 1f73d32..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; - - -@SpringBootApplication -@EnableConfigurationProperties(CounterApplicationProperties.class) -public class CounterApplication -{ - public static void main(String[] args) - { - SpringApplication.run(CounterApplication.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java deleted file mode 100644 index d670ba2..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java +++ /dev/null @@ -1,20 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - - -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.springframework.boot.context.properties.ConfigurationProperties; - - -@ConfigurationProperties("juplo.wordcount.counter") -@Getter -@Setter -@ToString -public class CounterApplicationProperties -{ - private String bootstrapServer = "localhost:9092"; - private String applicationId = "counter"; - private String inputTopic = "recordings"; - private String outputTopic = "countings"; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java deleted file mode 100644 index e8d7c11..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ /dev/null @@ -1,97 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.KStream; -import org.springframework.boot.SpringApplication; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.CompletableFuture; -import java.util.regex.Pattern; - -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; - - -@Slf4j -@Component -public class CounterStreamProcessor -{ - final static Pattern PATTERN = Pattern.compile("\\W+"); - - public final KafkaStreams streams; - - - public CounterStreamProcessor( - CounterApplicationProperties properties, - ObjectMapper mapper, - ConfigurableApplicationContext context) - { - StreamsBuilder builder = new StreamsBuilder(); - - KStream source = builder.stream(properties.getInputTopic()); - source - .flatMapValues(sentence -> Arrays.asList(PATTERN.split(sentence))) - .map((username, word) -> - { - try - { - String key = mapper.writeValueAsString(Key.of(username, word)); - return new KeyValue<>(key, word); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }) - .groupByKey() - .count() - .mapValues(value->Long.toString(value)) - .toStream() - .to(properties.getOutputTopic()); - - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - streams = new KafkaStreams(builder.build(), props); - streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); - } - - @PostConstruct - public void start() - { - log.info("Starting Stream-Processor"); - streams.start(); - } - - @PreDestroy - public void stop() - { - log.info("Stopping Stream-Processor"); - streams.close(); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java b/src/main/java/de/juplo/kafka/wordcount/counter/Key.java deleted file mode 100644 index 1e00dca..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.Value; - - -@Value(staticConstructor = "of") -public class Key -{ - private final String username; - private final String word; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java new file mode 100644 index 0000000..67f45f2 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java @@ -0,0 +1,11 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.Value; + + +@Value(staticConstructor = "of") +public class Entry +{ + private final String word; + private final Long count; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java new file mode 100644 index 0000000..d09dbcc --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.Getter; +import lombok.Setter; + + +@Getter +@Setter +public class Key +{ + private String username; + private String word; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java new file mode 100644 index 0000000..b748fe5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java @@ -0,0 +1,53 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.Getter; +import lombok.Setter; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + + +@Getter +@Setter +public class Ranking +{ + private Entry[] entries = new Entry[0]; + + public void add(Entry newEntry) + { + if (entries.length == 0) + { + entries = new Entry[1]; + entries[0] = newEntry; + return; + } + + List list = new LinkedList<>(Arrays.asList(entries)); + for (int i = 0; i < list.size(); i++) + { + Entry entry; + + entry = list.get(i); + if (entry.getCount() <= newEntry.getCount()) + { + list.add(i, newEntry); + for (int j = i+1; j < list.size(); j++) + { + entry = list.get(j); + if(entry.getWord().equals(newEntry.getWord())) + { + list.remove(j); + break; + } + } + if (list.size() > 10) + { + list = list.subList(0,10); + } + entries = list.toArray(num -> new Entry[num]); + return; + } + } + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java new file mode 100644 index 0000000..27dca95 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.wordcount.top10; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + + +@SpringBootApplication +@EnableConfigurationProperties(Top10ApplicationProperties.class) +public class Top10Application +{ + public static void main(String[] args) + { + SpringApplication.run(Top10Application.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java new file mode 100644 index 0000000..93b78ec --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java @@ -0,0 +1,20 @@ +package de.juplo.kafka.wordcount.top10; + + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("juplo.wordcount.top10") +@Getter +@Setter +@ToString +public class Top10ApplicationProperties +{ + private String bootstrapServer = "localhost:9092"; + private String applicationId = "top10"; + private String inputTopic = "countings"; + private String outputTopic = "top10"; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java new file mode 100644 index 0000000..862913a --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -0,0 +1,111 @@ +package de.juplo.kafka.wordcount.top10; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.springframework.boot.SpringApplication; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.regex.Pattern; + +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + + +@Slf4j +@Component +public class Top10StreamProcessor +{ + final static Pattern PATTERN = Pattern.compile("\\W+"); + + public final KafkaStreams streams; + + + public Top10StreamProcessor( + Top10ApplicationProperties properties, + ObjectMapper mapper, + ConfigurableApplicationContext context) + { + StreamsBuilder builder = new StreamsBuilder(); + + builder + .stream(properties.getInputTopic()) + .map((keyJson, countStr) -> + { + try + { + Key key = mapper.readValue(keyJson, Key.class); + Long count = Long.parseLong(countStr); + Entry entry = Entry.of(key.getWord(), count); + String entryJson = mapper.writeValueAsString(entry); + return new KeyValue<>(key.getUsername(), entryJson); + } + catch (JsonProcessingException e) + { + throw new RuntimeException(e); + } + }) + .groupByKey() + .aggregate( + () -> "{\"entries\" : []}", + (username, entryJson, rankingJson) -> + { + try + { + Ranking ranking = mapper.readValue(rankingJson, Ranking.class); + ranking.add(mapper.readValue(entryJson, Entry.class)); + return mapper.writeValueAsString(ranking); + } + catch (JsonProcessingException e) + { + throw new RuntimeException(e); + } + } + ) + .toStream() + .to(properties.getOutputTopic()); + + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + streams = new KafkaStreams(builder.build(), props); + streams.setUncaughtExceptionHandler((Throwable e) -> + { + log.error("Unexpected error!", e); + CompletableFuture.runAsync(() -> + { + log.info("Stopping application..."); + SpringApplication.exit(context, () -> 1); + }); + return SHUTDOWN_CLIENT; + }); + } + + @PostConstruct + public void start() + { + log.info("Starting Stream-Processor"); + streams.start(); + } + + @PreDestroy + public void stop() + { + log.info("Stopping Stream-Processor"); + streams.close(); + } +}