]> juplo.de Git - demos/kafka/wordcount/commitdiff
WIP
authorKai Moritz <kai@juplo.de>
Wed, 13 Oct 2021 16:15:00 +0000 (18:15 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 13 Oct 2021 16:15:00 +0000 (18:15 +0200)
pom.xml
src/main/java/de/juplo/kafka/wordcount/query/Entry.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/query/QueryController.java
src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/query/UserRankingResponse.java [new file with mode: 0644]
src/main/resources/avro/ranking.avsc
src/main/resources/avro/user.avsc
src/main/resources/avro/userranking.avsc

diff --git a/pom.xml b/pom.xml
index 0174f15f80e035a8dcdf03c095b13d7c5538040f..f992a6bcad602d5177067adcf56f06285504cc61 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                                                        <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
                                                        <outputDirectory>${project.basedir}/target/generated-sources</outputDirectory>
                                                        <fieldVisibility>PRIVATE</fieldVisibility>
+                                                       <stringType>String</stringType>
                                                        <includes>
                                                                <include>*.avsc</include>
                                                        </includes>
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java
deleted file mode 100644 (file)
index 4866e72..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka.wordcount.query;
-
-import lombok.Value;
-
-
-@Value(staticConstructor = "of")
-public class Entry
-{
-  private final String word;
-  private final Long count;
-}
index a9b5b80a6be7c1b55cdfa576d1d03d7cfe6e8343..e4daa4845b91e5166337eb8c05ed9c0dfd404821 100644 (file)
@@ -19,7 +19,7 @@ public class QueryController
   private final QueryStreamProcessor processor;
 
   @GetMapping("{username}")
-  ResponseEntity<UserRanking> queryFor(@PathVariable String username)
+  ResponseEntity<UserRankingResponse> queryFor(@PathVariable String username)
   {
     Optional<URI> redirect = processor.getRedirect(username);
     if (redirect.isPresent())
index 6dcf8b1238aad9f095fcfa38a7ab7a85a044b4cf..b52e6178b6ea7ea2fb9ce038dc56e89a920f9173 100644 (file)
@@ -1,7 +1,10 @@
 package de.juplo.kafka.wordcount.query;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.avro.Entry;
+import de.juplo.kafka.wordcount.avro.Ranking;
+import de.juplo.kafka.wordcount.avro.User;
+import de.juplo.kafka.wordcount.avro.UserRanking;
 import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
 import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
 import lombok.extern.slf4j.Slf4j;
@@ -35,7 +38,7 @@ public class QueryStreamProcessor
        public final KafkaStreams streams;
        public final HostInfo hostInfo;
        public final String storeName = "rankingsByUsername";
-       public final StoreQueryParameters<ReadOnlyKeyValueStore<String, UserRankingTO>> storeParameters;
+       public final StoreQueryParameters<ReadOnlyKeyValueStore<String, UserRanking>> storeParameters;
 
 
        public QueryStreamProcessor(
@@ -45,12 +48,12 @@ public class QueryStreamProcessor
        {
                StreamsBuilder builder = new StreamsBuilder();
 
-               KTable<String, UserTO> users = builder.table(properties.getUsersInputTopic());
-               KStream<String, RankingTO> rankings = builder.stream(properties.getRankingInputTopic());
+               KTable<String, User> users = builder.table(properties.getUsersInputTopic());
+               KStream<String, Ranking> rankings = builder.stream(properties.getRankingInputTopic());
 
                rankings
                                .join(users, (ranking, user) ->
-                                               UserRankingTO
+                                               UserRanking
                                                                .newBuilder()
                                                                .setFirstName(user.getFirstName())
                                                                .setLastName((user.getLastName()))
@@ -99,19 +102,19 @@ public class QueryStreamProcessor
                return Optional.of(location);
        }
 
-       public Optional<UserRanking> getUserRanking(String username)
+       public Optional<UserRankingResponse> getUserRanking(String username)
        {
                return
                                Optional
                                                .ofNullable(streams.store(storeParameters).get(username))
-                                               .map(userRankingTO -> UserRanking.of(
-                                                               userRankingTO.getFirstName().toString(),
-                                                               userRankingTO.getLastName().toString(),
-                                                               userRankingTO
+                                               .map(userRanking -> UserRankingResponse.of(
+                                                               userRanking.getFirstName(),
+                                                               userRanking.getLastName(),
+                                                               userRanking
                                                                                .getTop10()
                                                                                .stream()
-                                                                               .map(entryTO -> Entry.of(entryTO.getWord().toString(), entryTO.getCount()))
-                                                                               .toArray(size -> new Entry[size])));
+                                                                               .map((Entry entry) -> UserRankingResponse.Entry.of(entry.getWord(), entry.getCount()))
+                                                                               .toArray(size -> new UserRankingResponse.Entry[size])));
        }
 
        @PostConstruct
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java b/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java
deleted file mode 100644 (file)
index aa5d11e..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-package de.juplo.kafka.wordcount.query;
-
-import lombok.RequiredArgsConstructor;
-import lombok.Value;
-
-
-@Value
-@RequiredArgsConstructor(staticName = "of")
-public class UserRanking
-{
-  private final String firstName;
-  private final String lastName;
-  private final Entry[] top10;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/UserRankingResponse.java b/src/main/java/de/juplo/kafka/wordcount/query/UserRankingResponse.java
new file mode 100644 (file)
index 0000000..e783f87
--- /dev/null
@@ -0,0 +1,22 @@
+package de.juplo.kafka.wordcount.query;
+
+import lombok.RequiredArgsConstructor;
+import lombok.Value;
+
+
+@Value
+@RequiredArgsConstructor(staticName = "of")
+public class UserRankingResponse
+{
+  private final String firstName;
+  private final String lastName;
+  private final Entry[] top10;
+
+
+  @Value(staticConstructor = "of")
+  public static class Entry
+  {
+    private final String word;
+    private final Long count;
+  }
+}
index 41fcdcfb8950a68e172cabb53933d04f69f36b83..37e0f4425475ed9607ae80f43ff72df65718993b 100644 (file)
@@ -1,14 +1,14 @@
 {
     "type": "record",
-    "namespace": "de.juplo.kafka.wordcount.query",
-    "name": "RankingTO",
+    "namespace": "de.juplo.kafka.wordcount.avro",
+    "name": "Ranking",
     "fields": [
         {
             "name": "entries",
             "type": {
                 "type": "array",
                 "items": {
-                    "name": "EntryTO",
+                    "name": "Entry",
                     "type": "record",
                     "fields":[
                         {   "name": "word",
index 72541c07ca47b7dd07a5a599405d955c62164562..012b876bc8c958ae514ffe8bc47cf311cd4bd878 100644 (file)
@@ -1,7 +1,7 @@
 {
     "type": "record",
-    "namespace": "de.juplo.kafka.wordcount.query",
-    "name": "UserTO",
+    "namespace": "de.juplo.kafka.wordcount.avro",
+    "name": "User",
     "fields": [
         {
             "name": "username", "type": "string"
@@ -14,7 +14,7 @@
         },
         { "name": "sex",   "type":
             {
-                "type": "enum", "name": "SexTO",
+                "type": "enum", "name": "Sex",
                 "symbols": [ "UNKNOWN", "FEMALE", "MALE", "OTHER" ], "default": "UNKNOWN"
             },
             "default": "UNKNOWN"
index 591b517d8c077f6ee06e65377cb97bb6c63252f6..de8ec870b1c203c74dc4409b3d01cc2b5ca177cb 100644 (file)
@@ -1,7 +1,7 @@
 {
     "type": "record",
-    "namespace": "de.juplo.kafka.wordcount.query",
-    "name": "UserRankingTO",
+    "namespace": "de.juplo.kafka.wordcount.avro",
+    "name": "UserRanking",
     "fields": [
         {
             "name": "firstName", "type": "string", "default": ""
@@ -14,7 +14,7 @@
             "type": {
                 "type": "array",
                 "items": {
-                    "name": "EntryTO",
+                    "name": "Entry",
                     "type": "record",
                     "fields":[
                         {   "name": "word",