From 6fc61fb890315cdda2fad23bedb2044df3196aa8 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 15 Jun 2024 15:42:47 +0200 Subject: [PATCH] WIP:popular: 1.0.0 - Renamed packages and classes -- ALIGN --- .../popular/PopularStreamProcessor.java | 56 ++++++++----------- .../kafka/wordcount/popular/TestData.java | 4 +- 2 files changed, 26 insertions(+), 34 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java index 1c24e7a..4538c72 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java @@ -4,15 +4,15 @@ import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; -import org.apache.kafka.streams.state.HostInfo; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.QueryableStoreTypes; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.*; import org.springframework.kafka.support.serializer.JsonSerde; import java.net.URI; +import java.time.Duration; +import java.time.Instant; import java.util.Optional; import java.util.Properties; @@ -47,37 +47,29 @@ public class PopularStreamProcessor } static Topology buildTopology( - String usersInputTopic, - String rankingInputTopic, - KeyValueBytesStoreSupplier userStoreSupplier, - KeyValueBytesStoreSupplier rankingStoreSupplier) + String inputTopic, + WindowBytesStoreSupplier windowBytesStoreSupplier, + KeyValueBytesStoreSupplier keyValueBytesStoreSupplier) { StreamsBuilder builder = new StreamsBuilder(); - KTable users = builder - .stream( - usersInputTopic, - Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class))) - .toTable( - Materialized - .as(userStoreSupplier) - .withKeySerde(Serdes.String()) - .withValueSerde(new JsonSerde().copyWithType(User.class))); - KStream rankings = builder - .stream(rankingInputTopic) - .map((word, value) -> new KeyValue<>(word.getUser(), value)); - - rankings - .join(users, (ranking, user) -> UserRanking.of( - user.getFirstName(), - user.getLastName(), - ranking.getEntries()), - Joined.keySerde(Serdes.String())) - .toTable( - Materialized - .as(rankingStoreSupplier) - .withKeySerde(Serdes.String()) - .withValueSerde(new JsonSerde().copyWithType(UserRanking.class))); + builder + .stream(inputTopic) + .map((word, counter) -> new KeyValue<>(word.getUser(), counter)) + .groupByKey() + .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(1))) + .emitStrategy(EmitStrategy.onWindowClose()) + .aggregate( + () -> new Ranking(), + (word, counter, ranking) -> ranking, + Materialized.>as(windowBytesStoreSupplier)) + .toStream() + .map((windowedWord, ranking) -> + { + Instant endTime = windowedWord.window().endTime(); + return new KeyValue<>(windowedWord.key(), ranking); + }) + .toTable(Materialized.as(keyValueBytesStoreSupplier)); Topology topology = builder.build(); log.info("\n\n{}", topology.describe()); diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java index 1bceecc..2d95897 100644 --- a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java @@ -18,7 +18,7 @@ class TestData static final TestWord PETER = TestWord.of("peter"); static final TestWord KLAUS = TestWord.of("klaus"); - static final Stream> getTop10Messages() + static final Stream> getTop10Messages() { return Stream.of(TOP10_MESSAGES); } @@ -73,7 +73,7 @@ class TestData wordCounter.setCount(testEntry.getCount()); return wordCounter; } - private static KeyValue[] TOP10_MESSAGES = new KeyValue[] + private static KeyValue[] TOP10_MESSAGES = new KeyValue[] { KeyValue.pair( // 0 PETER, -- 2.20.1