From: Kai Moritz Date: Thu, 2 Sep 2021 05:15:37 +0000 (+0200) Subject: query:1.0.0 - Query rankings by username X-Git-Tag: query-1.0.0 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=3bfe34e8bc0539932cf93361fe6f710738b37897;p=demos%2Fkafka%2Fwordcount query:1.0.0 - Query rankings by username --- diff --git a/pom.xml b/pom.xml index 9bda638..d641d39 100644 --- a/pom.xml +++ b/pom.xml @@ -9,10 +9,10 @@ de.juplo.kafka.wordcount - top10 + query 1.0.0 - Wordcount-Top-10 - Top-10 stream-processor of the multi-user wordcount-example + Wordcount-Query + Query stream-processor of the multi-user wordcount-example 0.33.0 11 diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java new file mode 100644 index 0000000..4866e72 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java @@ -0,0 +1,11 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.Value; + + +@Value(staticConstructor = "of") +public class Entry +{ + private final String word; + private final Long count; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Key.java b/src/main/java/de/juplo/kafka/wordcount/query/Key.java new file mode 100644 index 0000000..be34ba8 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/query/Key.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.Getter; +import lombok.Setter; + + +@Getter +@Setter +public class Key +{ + private String username; + private String word; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java new file mode 100644 index 0000000..995c1a1 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.wordcount.query; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + + +@SpringBootApplication +@EnableConfigurationProperties(QueryApplicationProperties.class) +public class QueryApplication +{ + public static void main(String[] args) + { + SpringApplication.run(QueryApplication.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java new file mode 100644 index 0000000..19265f1 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java @@ -0,0 +1,21 @@ +package de.juplo.kafka.wordcount.query; + + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("juplo.wordcount.query") +@Getter +@Setter +@ToString +public class QueryApplicationProperties +{ + private String bootstrapServer = "localhost:9092"; + private String applicationId = "query"; + private String rankingInputTopic = "top10"; + private String usersInputTopic = "users"; + private String applicationServer = "localhost:8080"; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java new file mode 100644 index 0000000..14006b6 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java @@ -0,0 +1,35 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.RequiredArgsConstructor; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; + +import java.net.URI; +import java.util.Optional; + + +@RestController +@RequiredArgsConstructor +public class QueryController +{ + private final QueryStreamProcessor processor; + + @GetMapping("{username}") + ResponseEntity queryFor(@PathVariable String username) + { + Optional redirect = processor.getRedirect(username); + if (redirect.isPresent()) + { + return + ResponseEntity + .status(HttpStatus.TEMPORARY_REDIRECT) + .location(redirect.get()) + .build(); + } + + return ResponseEntity.of(processor.getRanking(username)); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java new file mode 100644 index 0000000..319861d --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -0,0 +1,119 @@ +package de.juplo.kafka.wordcount.query; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.boot.SpringApplication; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.net.URI; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + + +@Slf4j +@Component +public class QueryStreamProcessor +{ + public final KafkaStreams streams; + public final HostInfo hostInfo; + public final String storeName = "rankingsByUsername"; + public final StoreQueryParameters> storeParameters; + public final ObjectMapper mapper; + + + public QueryStreamProcessor( + QueryApplicationProperties properties, + ObjectMapper mapper, + ConfigurableApplicationContext context) + { + StreamsBuilder builder = new StreamsBuilder(); + + builder.table(properties.getRankingInputTopic(), Materialized.as(storeName)); + + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, properties.getApplicationServer()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + streams = new KafkaStreams(builder.build(), props); + streams.setUncaughtExceptionHandler((Throwable e) -> + { + log.error("Unexpected error!", e); + CompletableFuture.runAsync(() -> + { + log.info("Stopping application..."); + SpringApplication.exit(context, () -> 1); + }); + return SHUTDOWN_CLIENT; + }); + + hostInfo = HostInfo.buildFromEndpoint(properties.getApplicationServer()); + storeParameters = StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore());; + this.mapper = mapper; + } + + public Optional getRedirect(String username) + { + KeyQueryMetadata metadata = streams.queryMetadataForKey(storeName, username, Serdes.String().serializer()); + HostInfo activeHost = metadata.activeHost(); + log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port()); + + if (activeHost.equals(this.hostInfo)) + { + return Optional.empty(); + } + + URI location = URI.create("http://" + activeHost.host() + ":" + activeHost.port() + "/" + username); + log.debug("Redirecting to {}", location); + return Optional.of(location); + } + + public Optional getRanking(String username) + { + return + Optional + .ofNullable(streams.store(storeParameters).get(username)) + .map(json -> + { + try + { + return mapper.readValue(json, Ranking.class); + } + catch (JsonProcessingException e) + { + throw new RuntimeException(e); + } + }); + } + + @PostConstruct + public void start() + { + log.info("Starting Stream-Processor"); + streams.start(); + } + + @PreDestroy + public void stop() + { + log.info("Stopping Stream-Processor"); + streams.close(); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java new file mode 100644 index 0000000..69ae3aa --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.Getter; +import lombok.Setter; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + + +@Getter +@Setter +public class Ranking +{ + private Entry[] entries; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java deleted file mode 100644 index 67f45f2..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import lombok.Value; - - -@Value(staticConstructor = "of") -public class Entry -{ - private final String word; - private final Long count; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java deleted file mode 100644 index d09dbcc..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import lombok.Getter; -import lombok.Setter; - - -@Getter -@Setter -public class Key -{ - private String username; - private String word; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java deleted file mode 100644 index b748fe5..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java +++ /dev/null @@ -1,53 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import lombok.Getter; -import lombok.Setter; - -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; - - -@Getter -@Setter -public class Ranking -{ - private Entry[] entries = new Entry[0]; - - public void add(Entry newEntry) - { - if (entries.length == 0) - { - entries = new Entry[1]; - entries[0] = newEntry; - return; - } - - List list = new LinkedList<>(Arrays.asList(entries)); - for (int i = 0; i < list.size(); i++) - { - Entry entry; - - entry = list.get(i); - if (entry.getCount() <= newEntry.getCount()) - { - list.add(i, newEntry); - for (int j = i+1; j < list.size(); j++) - { - entry = list.get(j); - if(entry.getWord().equals(newEntry.getWord())) - { - list.remove(j); - break; - } - } - if (list.size() > 10) - { - list = list.subList(0,10); - } - entries = list.toArray(num -> new Entry[num]); - return; - } - } - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java deleted file mode 100644 index 27dca95..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; - - -@SpringBootApplication -@EnableConfigurationProperties(Top10ApplicationProperties.class) -public class Top10Application -{ - public static void main(String[] args) - { - SpringApplication.run(Top10Application.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java deleted file mode 100644 index 93b78ec..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java +++ /dev/null @@ -1,20 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - - -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.springframework.boot.context.properties.ConfigurationProperties; - - -@ConfigurationProperties("juplo.wordcount.top10") -@Getter -@Setter -@ToString -public class Top10ApplicationProperties -{ - private String bootstrapServer = "localhost:9092"; - private String applicationId = "top10"; - private String inputTopic = "countings"; - private String outputTopic = "top10"; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java deleted file mode 100644 index 862913a..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ /dev/null @@ -1,111 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.springframework.boot.SpringApplication; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.util.Properties; -import java.util.concurrent.CompletableFuture; -import java.util.regex.Pattern; - -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; - - -@Slf4j -@Component -public class Top10StreamProcessor -{ - final static Pattern PATTERN = Pattern.compile("\\W+"); - - public final KafkaStreams streams; - - - public Top10StreamProcessor( - Top10ApplicationProperties properties, - ObjectMapper mapper, - ConfigurableApplicationContext context) - { - StreamsBuilder builder = new StreamsBuilder(); - - builder - .stream(properties.getInputTopic()) - .map((keyJson, countStr) -> - { - try - { - Key key = mapper.readValue(keyJson, Key.class); - Long count = Long.parseLong(countStr); - Entry entry = Entry.of(key.getWord(), count); - String entryJson = mapper.writeValueAsString(entry); - return new KeyValue<>(key.getUsername(), entryJson); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }) - .groupByKey() - .aggregate( - () -> "{\"entries\" : []}", - (username, entryJson, rankingJson) -> - { - try - { - Ranking ranking = mapper.readValue(rankingJson, Ranking.class); - ranking.add(mapper.readValue(entryJson, Entry.class)); - return mapper.writeValueAsString(ranking); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - } - ) - .toStream() - .to(properties.getOutputTopic()); - - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - streams = new KafkaStreams(builder.build(), props); - streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); - } - - @PostConstruct - public void start() - { - log.info("Starting Stream-Processor"); - streams.start(); - } - - @PreDestroy - public void stop() - { - log.info("Stopping Stream-Processor"); - streams.close(); - } -}