package de.juplo.kafka.wordcount.query;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.avro.Entry;
+import de.juplo.kafka.wordcount.avro.Ranking;
+import de.juplo.kafka.wordcount.avro.User;
+import de.juplo.kafka.wordcount.avro.UserRanking;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import lombok.extern.slf4j.Slf4j;
public final KafkaStreams streams;
public final HostInfo hostInfo;
public final String storeName = "rankingsByUsername";
- public final StoreQueryParameters<ReadOnlyKeyValueStore<String, UserRankingTO>> storeParameters;
+ public final StoreQueryParameters<ReadOnlyKeyValueStore<String, UserRanking>> storeParameters;
public QueryStreamProcessor(
{
StreamsBuilder builder = new StreamsBuilder();
- KTable<String, UserTO> users = builder.table(properties.getUsersInputTopic());
- KStream<String, RankingTO> rankings = builder.stream(properties.getRankingInputTopic());
+ KTable<String, User> users = builder.table(properties.getUsersInputTopic());
+ KStream<String, Ranking> rankings = builder.stream(properties.getRankingInputTopic());
rankings
.join(users, (ranking, user) ->
- UserRankingTO
+ UserRanking
.newBuilder()
.setFirstName(user.getFirstName())
.setLastName((user.getLastName()))
return Optional.of(location);
}
- public Optional<UserRanking> getUserRanking(String username)
+ public Optional<UserRankingResponse> getUserRanking(String username)
{
return
Optional
.ofNullable(streams.store(storeParameters).get(username))
- .map(userRankingTO -> UserRanking.of(
- userRankingTO.getFirstName().toString(),
- userRankingTO.getLastName().toString(),
- userRankingTO
+ .map(userRanking -> UserRankingResponse.of(
+ userRanking.getFirstName(),
+ userRanking.getLastName(),
+ userRanking
.getTop10()
.stream()
- .map(entryTO -> Entry.of(entryTO.getWord().toString(), entryTO.getCount()))
- .toArray(size -> new Entry[size])));
+ .map((Entry entry) -> UserRankingResponse.Entry.of(entry.getWord(), entry.getCount()))
+ .toArray(size -> new UserRankingResponse.Entry[size])));
}
@PostConstruct