From 6730fbe1791d69b81e8f9476527e899239638e54 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 12 Oct 2021 22:02:54 +0200 Subject: [PATCH] WIP --- pom.xml | 13 ++++ .../de/juplo/kafka/wordcount/query/Key.java | 13 ---- .../wordcount/query/QueryStreamProcessor.java | 59 ++++++++----------- .../juplo/kafka/wordcount/query/Ranking.java | 16 ----- .../de/juplo/kafka/wordcount/query/User.java | 22 ------- .../kafka/wordcount/query/UserRanking.java | 16 +++-- src/main/resources/avro/ranking.avsc | 25 ++++++++ src/main/resources/avro/user.avsc | 2 +- src/main/resources/avro/userranking.avsc | 31 ++++++++++ 9 files changed, 100 insertions(+), 97 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/wordcount/query/Key.java delete mode 100644 src/main/java/de/juplo/kafka/wordcount/query/Ranking.java delete mode 100644 src/main/java/de/juplo/kafka/wordcount/query/User.java create mode 100644 src/main/resources/avro/ranking.avsc create mode 100644 src/main/resources/avro/userranking.avsc diff --git a/pom.xml b/pom.xml index e090dba..0174f15 100644 --- a/pom.xml +++ b/pom.xml @@ -18,6 +18,7 @@ 0.33.0 11 2.8.0 + 6.2.1 @@ -32,6 +33,11 @@ org.apache.kafka kafka-streams + + io.confluent + kafka-streams-avro-serde + ${confluent.version} + org.apache.avro avro @@ -111,4 +117,11 @@ + + + confluent + https://packages.confluent.io/maven/ + + + diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Key.java b/src/main/java/de/juplo/kafka/wordcount/query/Key.java deleted file mode 100644 index be34ba8..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/Key.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.Getter; -import lombok.Setter; - - -@Getter -@Setter -public class Key -{ - private String username; - private String word; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index 696e088..6dcf8b1 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -2,6 +2,8 @@ package de.juplo.kafka.wordcount.query; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; @@ -33,8 +35,7 @@ public class QueryStreamProcessor public final KafkaStreams streams; public final HostInfo hostInfo; public final String storeName = "rankingsByUsername"; - public final StoreQueryParameters> storeParameters; - public final ObjectMapper mapper; + public final StoreQueryParameters> storeParameters; public QueryStreamProcessor( @@ -44,36 +45,26 @@ public class QueryStreamProcessor { StreamsBuilder builder = new StreamsBuilder(); - KTable users = builder.table(properties.getUsersInputTopic()); - KStream rankings = builder.stream(properties.getRankingInputTopic()); + KTable users = builder.table(properties.getUsersInputTopic()); + KStream rankings = builder.stream(properties.getRankingInputTopic()); rankings - .join(users, (rankingJson, userJson) -> - { - try - { - Ranking ranking = mapper.readValue(rankingJson, Ranking.class); - User user = mapper.readValue(userJson, User.class); - - return mapper.writeValueAsString( - UserRanking.of( - user.getFirstName(), - user.getLastName(), - ranking.getEntries())); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }) + .join(users, (ranking, user) -> + UserRankingTO + .newBuilder() + .setFirstName(user.getFirstName()) + .setLastName((user.getLastName())) + .setTop10(ranking.getEntries()) + .build()) .toTable(Materialized.as(storeName)); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, properties.getApplicationServer()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "https://schema-registry:9081/"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streams = new KafkaStreams(builder.build(), props); @@ -90,7 +81,6 @@ public class QueryStreamProcessor hostInfo = HostInfo.buildFromEndpoint(properties.getApplicationServer()); storeParameters = StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore());; - this.mapper = mapper; } public Optional getRedirect(String username) @@ -114,17 +104,14 @@ public class QueryStreamProcessor return Optional .ofNullable(streams.store(storeParameters).get(username)) - .map(json -> - { - try - { - return mapper.readValue(json, UserRanking.class); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }); + .map(userRankingTO -> UserRanking.of( + userRankingTO.getFirstName().toString(), + userRankingTO.getLastName().toString(), + userRankingTO + .getTop10() + .stream() + .map(entryTO -> Entry.of(entryTO.getWord().toString(), entryTO.getCount())) + .toArray(size -> new Entry[size]))); } @PostConstruct diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java deleted file mode 100644 index 69ae3aa..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.Getter; -import lombok.Setter; - -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; - - -@Getter -@Setter -public class Ranking -{ - private Entry[] entries; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/User.java b/src/main/java/de/juplo/kafka/wordcount/query/User.java deleted file mode 100644 index fdc0a33..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/User.java +++ /dev/null @@ -1,22 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; - - -@Getter -@Setter -@ToString -@EqualsAndHashCode(of = "username") -public class User -{ - public enum Sex { FEMALE, MALE, OTHER } - - private String username; - - private String firstName; - private String lastName; - private Sex sex; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java b/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java index acffd5d..aa5d11e 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java @@ -1,16 +1,14 @@ package de.juplo.kafka.wordcount.query; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.Setter; +import lombok.RequiredArgsConstructor; +import lombok.Value; -@Getter -@Setter -@AllArgsConstructor(staticName = "of") +@Value +@RequiredArgsConstructor(staticName = "of") public class UserRanking { - private String firstName; - private String lastName; - private Entry[] top10; + private final String firstName; + private final String lastName; + private final Entry[] top10; } diff --git a/src/main/resources/avro/ranking.avsc b/src/main/resources/avro/ranking.avsc new file mode 100644 index 0000000..41fcdcf --- /dev/null +++ b/src/main/resources/avro/ranking.avsc @@ -0,0 +1,25 @@ +{ + "type": "record", + "namespace": "de.juplo.kafka.wordcount.query", + "name": "RankingTO", + "fields": [ + { + "name": "entries", + "type": { + "type": "array", + "items": { + "name": "EntryTO", + "type": "record", + "fields":[ + { "name": "word", + "type": "string" + }, + { "name": "count", + "type": "long" + } + ] + } + } + } + ] +} diff --git a/src/main/resources/avro/user.avsc b/src/main/resources/avro/user.avsc index 833291b..72541c0 100644 --- a/src/main/resources/avro/user.avsc +++ b/src/main/resources/avro/user.avsc @@ -1,7 +1,7 @@ { "type": "record", - "name": "UserTO", "namespace": "de.juplo.kafka.wordcount.query", + "name": "UserTO", "fields": [ { "name": "username", "type": "string" diff --git a/src/main/resources/avro/userranking.avsc b/src/main/resources/avro/userranking.avsc new file mode 100644 index 0000000..591b517 --- /dev/null +++ b/src/main/resources/avro/userranking.avsc @@ -0,0 +1,31 @@ +{ + "type": "record", + "namespace": "de.juplo.kafka.wordcount.query", + "name": "UserRankingTO", + "fields": [ + { + "name": "firstName", "type": "string", "default": "" + }, + { + "name": "lastName", "type": "string", "default": "" + }, + { + "name": "top10", + "type": { + "type": "array", + "items": { + "name": "EntryTO", + "type": "record", + "fields":[ + { "name": "word", + "type": "string" + }, + { "name": "count", + "type": "long" + } + ] + } + } + } + ] +} -- 2.20.1