WIP:popular: 1.0.0 - Renamed packages and classes -- ALIGN
authorKai Moritz <kai@juplo.de>
Sat, 15 Jun 2024 13:42:47 +0000 (15:42 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 15 Jun 2024 13:42:47 +0000 (15:42 +0200)
src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/popular/TestData.java

index 1c24e7a..4538c72 100644 (file)
@@ -4,15 +4,15 @@ import jakarta.annotation.PostConstruct;
 import jakarta.annotation.PreDestroy;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.*;
 import org.apache.kafka.streams.kstream.*;
-import org.apache.kafka.streams.state.HostInfo;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.*;
 import org.springframework.kafka.support.serializer.JsonSerde;
 
 import java.net.URI;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.Optional;
 import java.util.Properties;
 
@@ -47,37 +47,29 @@ public class PopularStreamProcessor
        }
 
        static Topology buildTopology(
-                       String usersInputTopic,
-                       String rankingInputTopic,
-                       KeyValueBytesStoreSupplier userStoreSupplier,
-                       KeyValueBytesStoreSupplier rankingStoreSupplier)
+                       String inputTopic,
+                       WindowBytesStoreSupplier windowBytesStoreSupplier,
+                       KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
-               KTable<String, User> users = builder
-                               .stream(
-                                               usersInputTopic,
-                                               Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class)))
-                               .toTable(
-                                               Materialized
-                                                               .<String, User>as(userStoreSupplier)
-                                                               .withKeySerde(Serdes.String())
-                                                               .withValueSerde(new JsonSerde().copyWithType(User.class)));
-               KStream<String, Ranking> rankings = builder
-                               .<Word, Ranking>stream(rankingInputTopic)
-                               .map((word, value) -> new KeyValue<>(word.getUser(), value));
-
-               rankings
-                               .join(users, (ranking, user) -> UserRanking.of(
-                                               user.getFirstName(),
-                                               user.getLastName(),
-                                               ranking.getEntries()),
-                                               Joined.keySerde(Serdes.String()))
-                               .toTable(
-                                               Materialized
-                                                               .<String, UserRanking>as(rankingStoreSupplier)
-                                                               .withKeySerde(Serdes.String())
-                                                               .withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
+               builder
+                               .<Word, WordCounter>stream(inputTopic)
+                               .map((word, counter) -> new KeyValue<>(word.getUser(), counter))
+                               .groupByKey()
+                               .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(1)))
+                               .emitStrategy(EmitStrategy.onWindowClose())
+                               .aggregate(
+                                               () -> new Ranking(),
+                                               (word, counter, ranking) -> ranking,
+                                               Materialized.<String, Ranking, WindowStore<Bytes, byte[]>>as(windowBytesStoreSupplier))
+                               .toStream()
+                               .map((windowedWord, ranking) ->
+                               {
+                                       Instant endTime = windowedWord.window().endTime();
+                                       return new KeyValue<>(windowedWord.key(), ranking);
+                               })
+                               .toTable(Materialized.as(keyValueBytesStoreSupplier));
 
                Topology topology = builder.build();
                log.info("\n\n{}", topology.describe());
index 1bceecc..2d95897 100644 (file)
@@ -18,7 +18,7 @@ class TestData
        static final TestWord PETER = TestWord.of("peter");
        static final TestWord KLAUS = TestWord.of("klaus");
 
-       static final Stream<KeyValue<TestWord, TestRanking>> getTop10Messages()
+       static final Stream<KeyValue<TestWord, TestWordCounter>> getTop10Messages()
        {
                return Stream.of(TOP10_MESSAGES);
        }
@@ -73,7 +73,7 @@ class TestData
                wordCounter.setCount(testEntry.getCount());
                return wordCounter;
        }
-       private static KeyValue<TestWord, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
+       private static KeyValue<TestWord, TestWordCounter>[] TOP10_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair( // 0
                                        PETER,