package de.juplo.kafka.wordcount.query;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.avro.Entry;
+import de.juplo.kafka.wordcount.avro.Ranking;
+import de.juplo.kafka.wordcount.avro.User;
+import de.juplo.kafka.wordcount.avro.UserRanking;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import lombok.extern.slf4j.Slf4j;
public final KafkaStreams streams;
public final HostInfo hostInfo;
public final String storeName = "rankingsByUsername";
- public final StoreQueryParameters<ReadOnlyKeyValueStore<String, UserRankingTO>> storeParameters;
+ public final StoreQueryParameters<ReadOnlyKeyValueStore<String, UserRanking>> storeParameters;
public QueryStreamProcessor(
{
StreamsBuilder builder = new StreamsBuilder();
- KTable<String, UserTO> users = builder.table(properties.getUsersInputTopic());
- KStream<String, RankingTO> rankings = builder.stream(properties.getRankingInputTopic());
+ KTable<String, User> users = builder.table(properties.getUsersInputTopic());
+ KStream<String, Ranking> rankings = builder.stream(properties.getRankingInputTopic());
rankings
.join(users, (ranking, user) ->
- UserRankingTO
+ UserRanking
.newBuilder()
.setFirstName(user.getFirstName())
.setLastName((user.getLastName()))
return Optional.of(location);
}
- public Optional<UserRanking> getUserRanking(String username)
+ public Optional<UserRankingResponse> getUserRanking(String username)
{
return
Optional
.ofNullable(streams.store(storeParameters).get(username))
- .map(userRankingTO -> UserRanking.of(
- userRankingTO.getFirstName().toString(),
- userRankingTO.getLastName().toString(),
- userRankingTO
+ .map(userRanking -> UserRankingResponse.of(
+ userRanking.getFirstName(),
+ userRanking.getLastName(),
+ userRanking
.getTop10()
.stream()
- .map(entryTO -> Entry.of(entryTO.getWord().toString(), entryTO.getCount()))
- .toArray(size -> new Entry[size])));
+ .map((Entry entry) -> UserRankingResponse.Entry.of(entry.getWord(), entry.getCount()))
+ .toArray(size -> new UserRankingResponse.Entry[size])));
}
@PostConstruct
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import lombok.RequiredArgsConstructor;
+import lombok.Value;
+
+
+@Value
+@RequiredArgsConstructor(staticName = "of")
+public class UserRankingResponse
+{
+ private final String firstName;
+ private final String lastName;
+ private final Entry[] top10;
+
+
+ @Value(staticConstructor = "of")
+ public static class Entry
+ {
+ private final String word;
+ private final Long count;
+ }
+}
{
"type": "record",
- "namespace": "de.juplo.kafka.wordcount.query",
- "name": "UserTO",
+ "namespace": "de.juplo.kafka.wordcount.avro",
+ "name": "User",
"fields": [
{
"name": "username", "type": "string"
},
{ "name": "sex", "type":
{
- "type": "enum", "name": "SexTO",
+ "type": "enum", "name": "Sex",
"symbols": [ "UNKNOWN", "FEMALE", "MALE", "OTHER" ], "default": "UNKNOWN"
},
"default": "UNKNOWN"