WIP
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / query / QueryStreamProcessor.java
index 6dcf8b1..b52e617 100644 (file)
@@ -1,7 +1,10 @@
 package de.juplo.kafka.wordcount.query;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.avro.Entry;
+import de.juplo.kafka.wordcount.avro.Ranking;
+import de.juplo.kafka.wordcount.avro.User;
+import de.juplo.kafka.wordcount.avro.UserRanking;
 import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
 import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
 import lombok.extern.slf4j.Slf4j;
@@ -35,7 +38,7 @@ public class QueryStreamProcessor
        public final KafkaStreams streams;
        public final HostInfo hostInfo;
        public final String storeName = "rankingsByUsername";
-       public final StoreQueryParameters<ReadOnlyKeyValueStore<String, UserRankingTO>> storeParameters;
+       public final StoreQueryParameters<ReadOnlyKeyValueStore<String, UserRanking>> storeParameters;
 
 
        public QueryStreamProcessor(
@@ -45,12 +48,12 @@ public class QueryStreamProcessor
        {
                StreamsBuilder builder = new StreamsBuilder();
 
-               KTable<String, UserTO> users = builder.table(properties.getUsersInputTopic());
-               KStream<String, RankingTO> rankings = builder.stream(properties.getRankingInputTopic());
+               KTable<String, User> users = builder.table(properties.getUsersInputTopic());
+               KStream<String, Ranking> rankings = builder.stream(properties.getRankingInputTopic());
 
                rankings
                                .join(users, (ranking, user) ->
-                                               UserRankingTO
+                                               UserRanking
                                                                .newBuilder()
                                                                .setFirstName(user.getFirstName())
                                                                .setLastName((user.getLastName()))
@@ -99,19 +102,19 @@ public class QueryStreamProcessor
                return Optional.of(location);
        }
 
-       public Optional<UserRanking> getUserRanking(String username)
+       public Optional<UserRankingResponse> getUserRanking(String username)
        {
                return
                                Optional
                                                .ofNullable(streams.store(storeParameters).get(username))
-                                               .map(userRankingTO -> UserRanking.of(
-                                                               userRankingTO.getFirstName().toString(),
-                                                               userRankingTO.getLastName().toString(),
-                                                               userRankingTO
+                                               .map(userRanking -> UserRankingResponse.of(
+                                                               userRanking.getFirstName(),
+                                                               userRanking.getLastName(),
+                                                               userRanking
                                                                                .getTop10()
                                                                                .stream()
-                                                                               .map(entryTO -> Entry.of(entryTO.getWord().toString(), entryTO.getCount()))
-                                                                               .toArray(size -> new Entry[size])));
+                                                                               .map((Entry entry) -> UserRankingResponse.Entry.of(entry.getWord(), entry.getCount()))
+                                                                               .toArray(size -> new UserRankingResponse.Entry[size])));
        }
 
        @PostConstruct