From c8fc265aa2a58e29cb88dedadadf485b4d12a418 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 13 Oct 2021 18:15:00 +0200 Subject: [PATCH] WIP --- pom.xml | 1 + .../de/juplo/kafka/wordcount/query/Entry.java | 11 -------- .../wordcount/query/QueryController.java | 2 +- .../wordcount/query/QueryStreamProcessor.java | 27 ++++++++++--------- ...rRanking.java => UserRankingResponse.java} | 10 ++++++- src/main/resources/avro/ranking.avsc | 6 ++--- src/main/resources/avro/user.avsc | 6 ++--- src/main/resources/avro/userranking.avsc | 6 ++--- 8 files changed, 35 insertions(+), 34 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/wordcount/query/Entry.java rename src/main/java/de/juplo/kafka/wordcount/query/{UserRanking.java => UserRankingResponse.java} (60%) diff --git a/pom.xml b/pom.xml index 0174f15..f992a6b 100644 --- a/pom.xml +++ b/pom.xml @@ -107,6 +107,7 @@ ${project.basedir}/src/main/resources/avro ${project.basedir}/target/generated-sources PRIVATE + String *.avsc diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java deleted file mode 100644 index 4866e72..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.Value; - - -@Value(staticConstructor = "of") -public class Entry -{ - private final String word; - private final Long count; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java index a9b5b80..e4daa48 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java @@ -19,7 +19,7 @@ public class QueryController private final QueryStreamProcessor processor; @GetMapping("{username}") - ResponseEntity queryFor(@PathVariable String username) + ResponseEntity queryFor(@PathVariable String username) { Optional redirect = processor.getRedirect(username); if (redirect.isPresent()) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index 6dcf8b1..b52e617 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -1,7 +1,10 @@ package de.juplo.kafka.wordcount.query; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.wordcount.avro.Entry; +import de.juplo.kafka.wordcount.avro.Ranking; +import de.juplo.kafka.wordcount.avro.User; +import de.juplo.kafka.wordcount.avro.UserRanking; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import lombok.extern.slf4j.Slf4j; @@ -35,7 +38,7 @@ public class QueryStreamProcessor public final KafkaStreams streams; public final HostInfo hostInfo; public final String storeName = "rankingsByUsername"; - public final StoreQueryParameters> storeParameters; + public final StoreQueryParameters> storeParameters; public QueryStreamProcessor( @@ -45,12 +48,12 @@ public class QueryStreamProcessor { StreamsBuilder builder = new StreamsBuilder(); - KTable users = builder.table(properties.getUsersInputTopic()); - KStream rankings = builder.stream(properties.getRankingInputTopic()); + KTable users = builder.table(properties.getUsersInputTopic()); + KStream rankings = builder.stream(properties.getRankingInputTopic()); rankings .join(users, (ranking, user) -> - UserRankingTO + UserRanking .newBuilder() .setFirstName(user.getFirstName()) .setLastName((user.getLastName())) @@ -99,19 +102,19 @@ public class QueryStreamProcessor return Optional.of(location); } - public Optional getUserRanking(String username) + public Optional getUserRanking(String username) { return Optional .ofNullable(streams.store(storeParameters).get(username)) - .map(userRankingTO -> UserRanking.of( - userRankingTO.getFirstName().toString(), - userRankingTO.getLastName().toString(), - userRankingTO + .map(userRanking -> UserRankingResponse.of( + userRanking.getFirstName(), + userRanking.getLastName(), + userRanking .getTop10() .stream() - .map(entryTO -> Entry.of(entryTO.getWord().toString(), entryTO.getCount())) - .toArray(size -> new Entry[size]))); + .map((Entry entry) -> UserRankingResponse.Entry.of(entry.getWord(), entry.getCount())) + .toArray(size -> new UserRankingResponse.Entry[size]))); } @PostConstruct diff --git a/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java b/src/main/java/de/juplo/kafka/wordcount/query/UserRankingResponse.java similarity index 60% rename from src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java rename to src/main/java/de/juplo/kafka/wordcount/query/UserRankingResponse.java index aa5d11e..e783f87 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/UserRankingResponse.java @@ -6,9 +6,17 @@ import lombok.Value; @Value @RequiredArgsConstructor(staticName = "of") -public class UserRanking +public class UserRankingResponse { private final String firstName; private final String lastName; private final Entry[] top10; + + + @Value(staticConstructor = "of") + public static class Entry + { + private final String word; + private final Long count; + } } diff --git a/src/main/resources/avro/ranking.avsc b/src/main/resources/avro/ranking.avsc index 41fcdcf..37e0f44 100644 --- a/src/main/resources/avro/ranking.avsc +++ b/src/main/resources/avro/ranking.avsc @@ -1,14 +1,14 @@ { "type": "record", - "namespace": "de.juplo.kafka.wordcount.query", - "name": "RankingTO", + "namespace": "de.juplo.kafka.wordcount.avro", + "name": "Ranking", "fields": [ { "name": "entries", "type": { "type": "array", "items": { - "name": "EntryTO", + "name": "Entry", "type": "record", "fields":[ { "name": "word", diff --git a/src/main/resources/avro/user.avsc b/src/main/resources/avro/user.avsc index 72541c0..012b876 100644 --- a/src/main/resources/avro/user.avsc +++ b/src/main/resources/avro/user.avsc @@ -1,7 +1,7 @@ { "type": "record", - "namespace": "de.juplo.kafka.wordcount.query", - "name": "UserTO", + "namespace": "de.juplo.kafka.wordcount.avro", + "name": "User", "fields": [ { "name": "username", "type": "string" @@ -14,7 +14,7 @@ }, { "name": "sex", "type": { - "type": "enum", "name": "SexTO", + "type": "enum", "name": "Sex", "symbols": [ "UNKNOWN", "FEMALE", "MALE", "OTHER" ], "default": "UNKNOWN" }, "default": "UNKNOWN" diff --git a/src/main/resources/avro/userranking.avsc b/src/main/resources/avro/userranking.avsc index 591b517..de8ec87 100644 --- a/src/main/resources/avro/userranking.avsc +++ b/src/main/resources/avro/userranking.avsc @@ -1,7 +1,7 @@ { "type": "record", - "namespace": "de.juplo.kafka.wordcount.query", - "name": "UserRankingTO", + "namespace": "de.juplo.kafka.wordcount.avro", + "name": "UserRanking", "fields": [ { "name": "firstName", "type": "string", "default": "" @@ -14,7 +14,7 @@ "type": { "type": "array", "items": { - "name": "EntryTO", + "name": "Entry", "type": "record", "fields":[ { "name": "word", -- 2.20.1