From 3bfe34e8bc0539932cf93361fe6f710738b37897 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 2 Sep 2021 07:15:37 +0200 Subject: [PATCH] query:1.0.0 - Query rankings by username --- pom.xml | 6 +- .../wordcount/{top10 => query}/Entry.java | 2 +- .../kafka/wordcount/{top10 => query}/Key.java | 2 +- .../QueryApplication.java} | 8 +- .../query/QueryApplicationProperties.java | 21 ++++ .../wordcount/query/QueryController.java | 35 ++++++ .../QueryStreamProcessor.java} | 102 ++++++++++-------- .../juplo/kafka/wordcount/query/Ranking.java | 16 +++ .../juplo/kafka/wordcount/top10/Ranking.java | 53 --------- .../top10/Top10ApplicationProperties.java | 20 ---- 10 files changed, 136 insertions(+), 129 deletions(-) rename src/main/java/de/juplo/kafka/wordcount/{top10 => query}/Entry.java (77%) rename src/main/java/de/juplo/kafka/wordcount/{top10 => query}/Key.java (77%) rename src/main/java/de/juplo/kafka/wordcount/{top10/Top10Application.java => query/QueryApplication.java} (59%) create mode 100644 src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java create mode 100644 src/main/java/de/juplo/kafka/wordcount/query/QueryController.java rename src/main/java/de/juplo/kafka/wordcount/{top10/Top10StreamProcessor.java => query/QueryStreamProcessor.java} (52%) create mode 100644 src/main/java/de/juplo/kafka/wordcount/query/Ranking.java delete mode 100644 src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java delete mode 100644 src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java diff --git a/pom.xml b/pom.xml index 9bda638..d641d39 100644 --- a/pom.xml +++ b/pom.xml @@ -9,10 +9,10 @@ de.juplo.kafka.wordcount - top10 + query 1.0.0 - Wordcount-Top-10 - Top-10 stream-processor of the multi-user wordcount-example + Wordcount-Query + Query stream-processor of the multi-user wordcount-example 0.33.0 11 diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java similarity index 77% rename from src/main/java/de/juplo/kafka/wordcount/top10/Entry.java rename to src/main/java/de/juplo/kafka/wordcount/query/Entry.java index 67f45f2..4866e72 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.top10; +package de.juplo.kafka.wordcount.query; import lombok.Value; diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java b/src/main/java/de/juplo/kafka/wordcount/query/Key.java similarity index 77% rename from src/main/java/de/juplo/kafka/wordcount/top10/Key.java rename to src/main/java/de/juplo/kafka/wordcount/query/Key.java index d09dbcc..be34ba8 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/Key.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.top10; +package de.juplo.kafka.wordcount.query; import lombok.Getter; import lombok.Setter; diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java similarity index 59% rename from src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java rename to src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java index 27dca95..995c1a1 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.top10; +package de.juplo.kafka.wordcount.query; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -6,11 +6,11 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties @SpringBootApplication -@EnableConfigurationProperties(Top10ApplicationProperties.class) -public class Top10Application +@EnableConfigurationProperties(QueryApplicationProperties.class) +public class QueryApplication { public static void main(String[] args) { - SpringApplication.run(Top10Application.class, args); + SpringApplication.run(QueryApplication.class, args); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java new file mode 100644 index 0000000..19265f1 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java @@ -0,0 +1,21 @@ +package de.juplo.kafka.wordcount.query; + + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("juplo.wordcount.query") +@Getter +@Setter +@ToString +public class QueryApplicationProperties +{ + private String bootstrapServer = "localhost:9092"; + private String applicationId = "query"; + private String rankingInputTopic = "top10"; + private String usersInputTopic = "users"; + private String applicationServer = "localhost:8080"; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java new file mode 100644 index 0000000..14006b6 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java @@ -0,0 +1,35 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.RequiredArgsConstructor; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; + +import java.net.URI; +import java.util.Optional; + + +@RestController +@RequiredArgsConstructor +public class QueryController +{ + private final QueryStreamProcessor processor; + + @GetMapping("{username}") + ResponseEntity queryFor(@PathVariable String username) + { + Optional redirect = processor.getRedirect(username); + if (redirect.isPresent()) + { + return + ResponseEntity + .status(HttpStatus.TEMPORARY_REDIRECT) + .location(redirect.get()) + .build(); + } + + return ResponseEntity.of(processor.getRanking(username)); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java similarity index 52% rename from src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java rename to src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index 862913a..319861d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -1,82 +1,52 @@ -package de.juplo.kafka.wordcount.top10; +package de.juplo.kafka.wordcount.query; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.net.URI; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import java.util.regex.Pattern; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @Slf4j @Component -public class Top10StreamProcessor +public class QueryStreamProcessor { - final static Pattern PATTERN = Pattern.compile("\\W+"); - public final KafkaStreams streams; + public final HostInfo hostInfo; + public final String storeName = "rankingsByUsername"; + public final StoreQueryParameters> storeParameters; + public final ObjectMapper mapper; - public Top10StreamProcessor( - Top10ApplicationProperties properties, + public QueryStreamProcessor( + QueryApplicationProperties properties, ObjectMapper mapper, ConfigurableApplicationContext context) { StreamsBuilder builder = new StreamsBuilder(); - builder - .stream(properties.getInputTopic()) - .map((keyJson, countStr) -> - { - try - { - Key key = mapper.readValue(keyJson, Key.class); - Long count = Long.parseLong(countStr); - Entry entry = Entry.of(key.getWord(), count); - String entryJson = mapper.writeValueAsString(entry); - return new KeyValue<>(key.getUsername(), entryJson); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }) - .groupByKey() - .aggregate( - () -> "{\"entries\" : []}", - (username, entryJson, rankingJson) -> - { - try - { - Ranking ranking = mapper.readValue(rankingJson, Ranking.class); - ranking.add(mapper.readValue(entryJson, Entry.class)); - return mapper.writeValueAsString(ranking); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - } - ) - .toStream() - .to(properties.getOutputTopic()); + builder.table(properties.getRankingInputTopic(), Materialized.as(storeName)); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, properties.getApplicationServer()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); @@ -93,6 +63,44 @@ public class Top10StreamProcessor }); return SHUTDOWN_CLIENT; }); + + hostInfo = HostInfo.buildFromEndpoint(properties.getApplicationServer()); + storeParameters = StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore());; + this.mapper = mapper; + } + + public Optional getRedirect(String username) + { + KeyQueryMetadata metadata = streams.queryMetadataForKey(storeName, username, Serdes.String().serializer()); + HostInfo activeHost = metadata.activeHost(); + log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port()); + + if (activeHost.equals(this.hostInfo)) + { + return Optional.empty(); + } + + URI location = URI.create("http://" + activeHost.host() + ":" + activeHost.port() + "/" + username); + log.debug("Redirecting to {}", location); + return Optional.of(location); + } + + public Optional getRanking(String username) + { + return + Optional + .ofNullable(streams.store(storeParameters).get(username)) + .map(json -> + { + try + { + return mapper.readValue(json, Ranking.class); + } + catch (JsonProcessingException e) + { + throw new RuntimeException(e); + } + }); } @PostConstruct diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java new file mode 100644 index 0000000..69ae3aa --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.Getter; +import lombok.Setter; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + + +@Getter +@Setter +public class Ranking +{ + private Entry[] entries; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java deleted file mode 100644 index b748fe5..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java +++ /dev/null @@ -1,53 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import lombok.Getter; -import lombok.Setter; - -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; - - -@Getter -@Setter -public class Ranking -{ - private Entry[] entries = new Entry[0]; - - public void add(Entry newEntry) - { - if (entries.length == 0) - { - entries = new Entry[1]; - entries[0] = newEntry; - return; - } - - List list = new LinkedList<>(Arrays.asList(entries)); - for (int i = 0; i < list.size(); i++) - { - Entry entry; - - entry = list.get(i); - if (entry.getCount() <= newEntry.getCount()) - { - list.add(i, newEntry); - for (int j = i+1; j < list.size(); j++) - { - entry = list.get(j); - if(entry.getWord().equals(newEntry.getWord())) - { - list.remove(j); - break; - } - } - if (list.size() > 10) - { - list = list.subList(0,10); - } - entries = list.toArray(num -> new Entry[num]); - return; - } - } - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java deleted file mode 100644 index 93b78ec..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java +++ /dev/null @@ -1,20 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - - -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.springframework.boot.context.properties.ConfigurationProperties; - - -@ConfigurationProperties("juplo.wordcount.top10") -@Getter -@Setter -@ToString -public class Top10ApplicationProperties -{ - private String bootstrapServer = "localhost:9092"; - private String applicationId = "top10"; - private String inputTopic = "countings"; - private String outputTopic = "top10"; -} -- 2.20.1