WIP
authorKai Moritz <kai@juplo.de>
Wed, 13 Oct 2021 16:15:00 +0000 (18:15 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 13 Oct 2021 16:15:00 +0000 (18:15 +0200)
pom.xml
src/main/java/de/juplo/kafka/wordcount/query/Entry.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/query/QueryController.java
src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/query/UserRankingResponse.java [new file with mode: 0644]
src/main/resources/avro/ranking.avsc
src/main/resources/avro/user.avsc
src/main/resources/avro/userranking.avsc

diff --git a/pom.xml b/pom.xml
index 0174f15..f992a6b 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                                                        <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
                                                        <outputDirectory>${project.basedir}/target/generated-sources</outputDirectory>
                                                        <fieldVisibility>PRIVATE</fieldVisibility>
+                                                       <stringType>String</stringType>
                                                        <includes>
                                                                <include>*.avsc</include>
                                                        </includes>
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java
deleted file mode 100644 (file)
index 4866e72..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka.wordcount.query;
-
-import lombok.Value;
-
-
-@Value(staticConstructor = "of")
-public class Entry
-{
-  private final String word;
-  private final Long count;
-}
index a9b5b80..e4daa48 100644 (file)
@@ -19,7 +19,7 @@ public class QueryController
   private final QueryStreamProcessor processor;
 
   @GetMapping("{username}")
-  ResponseEntity<UserRanking> queryFor(@PathVariable String username)
+  ResponseEntity<UserRankingResponse> queryFor(@PathVariable String username)
   {
     Optional<URI> redirect = processor.getRedirect(username);
     if (redirect.isPresent())
index 6dcf8b1..b52e617 100644 (file)
@@ -1,7 +1,10 @@
 package de.juplo.kafka.wordcount.query;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.avro.Entry;
+import de.juplo.kafka.wordcount.avro.Ranking;
+import de.juplo.kafka.wordcount.avro.User;
+import de.juplo.kafka.wordcount.avro.UserRanking;
 import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
 import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
 import lombok.extern.slf4j.Slf4j;
@@ -35,7 +38,7 @@ public class QueryStreamProcessor
        public final KafkaStreams streams;
        public final HostInfo hostInfo;
        public final String storeName = "rankingsByUsername";
-       public final StoreQueryParameters<ReadOnlyKeyValueStore<String, UserRankingTO>> storeParameters;
+       public final StoreQueryParameters<ReadOnlyKeyValueStore<String, UserRanking>> storeParameters;
 
 
        public QueryStreamProcessor(
@@ -45,12 +48,12 @@ public class QueryStreamProcessor
        {
                StreamsBuilder builder = new StreamsBuilder();
 
-               KTable<String, UserTO> users = builder.table(properties.getUsersInputTopic());
-               KStream<String, RankingTO> rankings = builder.stream(properties.getRankingInputTopic());
+               KTable<String, User> users = builder.table(properties.getUsersInputTopic());
+               KStream<String, Ranking> rankings = builder.stream(properties.getRankingInputTopic());
 
                rankings
                                .join(users, (ranking, user) ->
-                                               UserRankingTO
+                                               UserRanking
                                                                .newBuilder()
                                                                .setFirstName(user.getFirstName())
                                                                .setLastName((user.getLastName()))
@@ -99,19 +102,19 @@ public class QueryStreamProcessor
                return Optional.of(location);
        }
 
-       public Optional<UserRanking> getUserRanking(String username)
+       public Optional<UserRankingResponse> getUserRanking(String username)
        {
                return
                                Optional
                                                .ofNullable(streams.store(storeParameters).get(username))
-                                               .map(userRankingTO -> UserRanking.of(
-                                                               userRankingTO.getFirstName().toString(),
-                                                               userRankingTO.getLastName().toString(),
-                                                               userRankingTO
+                                               .map(userRanking -> UserRankingResponse.of(
+                                                               userRanking.getFirstName(),
+                                                               userRanking.getLastName(),
+                                                               userRanking
                                                                                .getTop10()
                                                                                .stream()
-                                                                               .map(entryTO -> Entry.of(entryTO.getWord().toString(), entryTO.getCount()))
-                                                                               .toArray(size -> new Entry[size])));
+                                                                               .map((Entry entry) -> UserRankingResponse.Entry.of(entry.getWord(), entry.getCount()))
+                                                                               .toArray(size -> new UserRankingResponse.Entry[size])));
        }
 
        @PostConstruct
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java b/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java
deleted file mode 100644 (file)
index aa5d11e..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-package de.juplo.kafka.wordcount.query;
-
-import lombok.RequiredArgsConstructor;
-import lombok.Value;
-
-
-@Value
-@RequiredArgsConstructor(staticName = "of")
-public class UserRanking
-{
-  private final String firstName;
-  private final String lastName;
-  private final Entry[] top10;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/UserRankingResponse.java b/src/main/java/de/juplo/kafka/wordcount/query/UserRankingResponse.java
new file mode 100644 (file)
index 0000000..e783f87
--- /dev/null
@@ -0,0 +1,22 @@
+package de.juplo.kafka.wordcount.query;
+
+import lombok.RequiredArgsConstructor;
+import lombok.Value;
+
+
+@Value
+@RequiredArgsConstructor(staticName = "of")
+public class UserRankingResponse
+{
+  private final String firstName;
+  private final String lastName;
+  private final Entry[] top10;
+
+
+  @Value(staticConstructor = "of")
+  public static class Entry
+  {
+    private final String word;
+    private final Long count;
+  }
+}
index 41fcdcf..37e0f44 100644 (file)
@@ -1,14 +1,14 @@
 {
     "type": "record",
-    "namespace": "de.juplo.kafka.wordcount.query",
-    "name": "RankingTO",
+    "namespace": "de.juplo.kafka.wordcount.avro",
+    "name": "Ranking",
     "fields": [
         {
             "name": "entries",
             "type": {
                 "type": "array",
                 "items": {
-                    "name": "EntryTO",
+                    "name": "Entry",
                     "type": "record",
                     "fields":[
                         {   "name": "word",
index 72541c0..012b876 100644 (file)
@@ -1,7 +1,7 @@
 {
     "type": "record",
-    "namespace": "de.juplo.kafka.wordcount.query",
-    "name": "UserTO",
+    "namespace": "de.juplo.kafka.wordcount.avro",
+    "name": "User",
     "fields": [
         {
             "name": "username", "type": "string"
@@ -14,7 +14,7 @@
         },
         { "name": "sex",   "type":
             {
-                "type": "enum", "name": "SexTO",
+                "type": "enum", "name": "Sex",
                 "symbols": [ "UNKNOWN", "FEMALE", "MALE", "OTHER" ], "default": "UNKNOWN"
             },
             "default": "UNKNOWN"
index 591b517..de8ec87 100644 (file)
@@ -1,7 +1,7 @@
 {
     "type": "record",
-    "namespace": "de.juplo.kafka.wordcount.query",
-    "name": "UserRankingTO",
+    "namespace": "de.juplo.kafka.wordcount.avro",
+    "name": "UserRanking",
     "fields": [
         {
             "name": "firstName", "type": "string", "default": ""
@@ -14,7 +14,7 @@
             "type": {
                 "type": "array",
                 "items": {
-                    "name": "EntryTO",
+                    "name": "Entry",
                     "type": "record",
                     "fields":[
                         {   "name": "word",