From bc012a0195c72c2e5dbd50b8c8e9c78d079407dd Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 16 Jun 2024 11:57:03 +0200 Subject: [PATCH] WIP --- .../PopularApplicationConfiguration.java | 15 +++++++---- .../wordcount/popular/PopularController.java | 5 ++-- .../popular/PopularStreamProcessor.java | 4 +-- .../popular/PopularApplicationIT.java | 2 +- .../PopularStreamProcessorTopologyTest.java | 26 +++++++++---------- .../kafka/wordcount/popular/TestData.java | 17 +++++------- 6 files changed, 33 insertions(+), 36 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguration.java index 1997d4e..e52d1c9 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguration.java @@ -6,6 +6,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.web.ServerProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -18,6 +19,7 @@ import org.springframework.kafka.support.serializer.JsonSerde; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; +import java.time.Duration; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -107,8 +109,7 @@ public class PopularApplicationConfiguration PopularStreamProcessor streamProcessor = new PopularStreamProcessor( streamProcessorProperties, applicationServer, - applicationProperties.getUsersInputTopic(), - applicationProperties.getRankingInputTopic(), + applicationProperties.getInputTopic(), userStoreSupplier, rankingStoreSupplier); @@ -127,13 +128,17 @@ public class PopularApplicationConfiguration } @Bean - public KeyValueBytesStoreSupplier userStoreSupplier() + public WindowBytesStoreSupplier windowedBytesStoreSupplier() { - return Stores.persistentKeyValueStore(USER_STORE_NAME); + return Stores.persistentWindowStore( + USER_STORE_NAME, + Duration.ofMinutes(1), + Duration.ofMinutes(1), + false); } @Bean - public KeyValueBytesStoreSupplier rankingStoreSupplier() + public KeyValueBytesStoreSupplier keyValueByptesStoreSupplier() { return Stores.persistentKeyValueStore(RANKING_STORE_NAME); } diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularController.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularController.java index 1a3f120..8aa13b9 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/PopularController.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularController.java @@ -5,7 +5,6 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; import java.net.URI; @@ -18,8 +17,8 @@ public class PopularController { private final PopularStreamProcessor processor; - @GetMapping("{username}") - ResponseEntity queryFor(@PathVariable String username) + @GetMapping + ResponseEntity getRanking() { Optional redirect = processor.getRedirect(username); if (redirect.isPresent()) diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java index 4538c72..12f2cb5 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java @@ -61,8 +61,8 @@ public class PopularStreamProcessor .emitStrategy(EmitStrategy.onWindowClose()) .aggregate( () -> new Ranking(), - (word, counter, ranking) -> ranking, - Materialized.>as(windowBytesStoreSupplier)) + (word, counter, ranking) -> ranking) //, + // Materialized.>as(windowBytesStoreSupplier)) .toStream() .map((windowedWord, ranking) -> { diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java index bf172e0..e953817 100644 --- a/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java @@ -74,7 +74,7 @@ public class PopularApplicationIT .getUsersMessages() .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value))); TestData - .getTop10Messages() + .getMessages() .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value))); } diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java index 32fb0bf..ddbdea0 100644 --- a/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java @@ -2,9 +2,9 @@ package de.juplo.kafka.wordcount.popular; import de.juplo.kafka.wordcount.counter.TestRanking; import de.juplo.kafka.wordcount.counter.TestWord; +import de.juplo.kafka.wordcount.counter.TestWordCounter; import de.juplo.kafka.wordcount.users.TestUserData; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; @@ -15,6 +15,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.kafka.support.serializer.JsonSerializer; +import java.time.Duration; import java.util.Map; import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguration.serializationConfig; @@ -30,8 +31,7 @@ public class PopularStreamProcessorTopologyTest TopologyTestDriver testDriver; - TestInputTopic top10In; - TestInputTopic userIn; + TestInputTopic in; @BeforeEach @@ -39,21 +39,19 @@ public class PopularStreamProcessorTopologyTest { Topology topology = PopularStreamProcessor.buildTopology( USERS_IN, - TOP10_IN, - Stores.inMemoryKeyValueStore(USERS_STORE_NAME), + Stores.inMemoryWindowStore( + USERS_STORE_NAME, + Duration.ofSeconds(1), + Duration.ofSeconds(1), + false), Stores.inMemoryKeyValueStore(RANKING_STORE_NAME)); testDriver = new TopologyTestDriver(topology, serializationConfig()); - top10In = testDriver.createInputTopic( + in = testDriver.createInputTopic( TOP10_IN, jsonSerializer(TestWord.class, true), - jsonSerializer(TestRanking.class,false)); - - userIn = testDriver.createInputTopic( - USERS_IN, - new StringSerializer(), - jsonSerializer(TestUserData.class, false).noTypeInfo()); + jsonSerializer(TestWordCounter.class,false)); } @@ -64,8 +62,8 @@ public class PopularStreamProcessorTopologyTest .getUsersMessages() .forEach(kv -> userIn.pipeInput(kv.key, kv.value)); TestData - .getTop10Messages() - .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); + .getMessages() + .forEach(kv -> in.pipeInput(kv.key, kv.value)); KeyValueStore store = testDriver.getKeyValueStore(RANKING_STORE_NAME); TestData.assertExpectedState(user -> store.get(user)); diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java index 2d95897..cce6055 100644 --- a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java @@ -18,37 +18,32 @@ class TestData static final TestWord PETER = TestWord.of("peter"); static final TestWord KLAUS = TestWord.of("klaus"); - static final Stream> getTop10Messages() + static final Stream> getMessages() { return Stream.of(TOP10_MESSAGES); } - static final Stream> getUsersMessages() - { - return Stream.of(USERS_MESSAGES); - } - - static void assertExpectedState(Function function) + static void assertExpectedState(Function function) { assertRankingEqualsRankingFromLastMessage(PETER.getUser(), function.apply(PETER.getUser())); assertRankingEqualsRankingFromLastMessage(KLAUS.getUser(), function.apply(KLAUS.getUser())); } - private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson) + private static void assertRankingEqualsRankingFromLastMessage(String user, Ranking rankingJson) { assertThat(rankingJson).isEqualTo(getLastMessageFor(user)); } - private static UserRanking getLastMessageFor(String user) + private static Ranking getLastMessageFor(String user) { - return getTop10Messages() + return getMessages() .filter(kv -> kv.key.getUser().equals(user)) .map(kv -> kv.value) .map(testRanking -> userRankingFor(user, testRanking)) .reduce(null, (left, right) -> right); } - private static UserRanking userRankingFor(String user, TestRanking testRanking) + private static Ranking userRankingFor(String user, Ranking testRanking) { TestUserData testUserData = getUsersMessages() .filter(kv -> kv.key.equals(user)) -- 2.20.1