From: Kai Moritz Date: Sun, 16 Jun 2024 09:48:24 +0000 (+0200) Subject: popular: 1.0.0 - Word are counted for hopping time-windows X-Git-Tag: popular-1.0.0 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=071bfe7637dd02088b0c6b22ad561b8b50677cf6;p=demos%2Fkafka%2Fwordcount popular: 1.0.0 - Word are counted for hopping time-windows --- diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java index 1c02197..bee13e7 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java @@ -5,6 +5,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.springframework.boot.SpringApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; @@ -13,10 +14,13 @@ import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerde; +import java.time.Duration; +import java.time.ZoneId; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.KEY_VALUE_STORE_NAME; +import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_STORE_NAME; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @@ -59,14 +63,18 @@ public class PopularApplicationConfiguriation public PopularStreamProcessor streamProcessor( PopularApplicationProperties applicationProperties, Properties streamProcessorProperties, - KeyValueBytesStoreSupplier storeSupplier, + ZoneId zone, + KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, + WindowBytesStoreSupplier windowBytesStoreSupplier, ConfigurableApplicationContext context) { PopularStreamProcessor streamProcessor = new PopularStreamProcessor( applicationProperties.getInputTopic(), applicationProperties.getOutputTopic(), streamProcessorProperties, - storeSupplier); + zone, + windowBytesStoreSupplier, + keyValueBytesStoreSupplier); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> { @@ -84,8 +92,24 @@ public class PopularApplicationConfiguriation } @Bean - public KeyValueBytesStoreSupplier storeSupplier() + public ZoneId defaultZone() { - return Stores.persistentKeyValueStore(STORE_NAME); + return ZoneId.systemDefault(); + } + + @Bean + public WindowBytesStoreSupplier windowBytesStoreSupplier() + { + return Stores.persistentWindowStore( + KEY_VALUE_STORE_NAME, + Duration.ofSeconds(60), + Duration.ofSeconds(30), + true); + } + + @Bean + public KeyValueBytesStoreSupplier keyValueBytesStoreSupplier() + { + return Stores.persistentKeyValueStore(WINDOW_STORE_NAME); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java index 7bba240..9ba9ad2 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java @@ -1,16 +1,22 @@ package de.juplo.kafka.wordcount.popular; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.springframework.kafka.support.serializer.JsonSerde; import org.springframework.kafka.support.serializer.JsonSerializer; +import java.time.Duration; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; @@ -19,7 +25,9 @@ import java.util.stream.Collectors; @Slf4j public class PopularStreamProcessor { - public static final String STORE_NAME = "popular"; + public static final String KEY_VALUE_STORE_NAME = "popular"; + public static final String WINDOW_STORE_NAME = "popular-windows"; + public static final Duration WINDOW_SIZE = Duration.ofSeconds(30); public final KafkaStreams streams; @@ -29,12 +37,16 @@ public class PopularStreamProcessor String inputTopic, String outputTopic, Properties properties, - KeyValueBytesStoreSupplier storeSupplier) + ZoneId zone, + WindowBytesStoreSupplier windowBytesStoreSupplier, + KeyValueBytesStoreSupplier keyValueBytesStoreSupplier) { Topology topology = PopularStreamProcessor.buildTopology( inputTopic, outputTopic, - storeSupplier); + zone, + windowBytesStoreSupplier, + keyValueBytesStoreSupplier); streams = new KafkaStreams(topology, properties); } @@ -42,20 +54,35 @@ public class PopularStreamProcessor static Topology buildTopology( String inputTopic, String outputTopic, - KeyValueBytesStoreSupplier storeSupplier) + ZoneId zone, + WindowBytesStoreSupplier windowBytesStoreSupplier, + KeyValueBytesStoreSupplier keyValueBytesStoreSupplier) { StreamsBuilder builder = new StreamsBuilder(); builder .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde())) - .map((key, word) -> new KeyValue<>(word, word)) + .map((key, userWord) -> new KeyValue<>(Word.of(userWord.getWord()), Word.of(userWord.getWord()))) .groupByKey() + .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE)) .count( Materialized - .as(storeSupplier) - .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed! + .as(windowBytesStoreSupplier) + .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo()) + .withValueSerde(Serdes.Long())) + .toStream() + .map((windowedWord, counter) -> new KeyValue<>( + WindowedWord.of( + ZonedDateTime.ofInstant(windowedWord.window().startTime(), zone), + ZonedDateTime.ofInstant(windowedWord.window().endTime(), zone), + windowedWord.key().getWord()), + WordCounter.of(windowedWord.key().getWord(), counter))) + .toTable( + Materialized + .as(keyValueBytesStoreSupplier) + .withKeySerde(new JsonSerde<>(WindowedWord.class).noTypeInfo()) + .withValueSerde(new JsonSerde<>(WordCounter.class).noTypeInfo())) .toStream() - .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter))) .to(outputTopic, Produced.with(outKeySerde(), outValueSerde())); Topology topology = builder.build(); @@ -64,9 +91,9 @@ public class PopularStreamProcessor return topology; } - ReadOnlyKeyValueStore getStore() + ReadOnlyKeyValueStore getStore() { - return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); + return streams.store(StoreQueryParameters.fromNameAndType(KEY_VALUE_STORE_NAME, QueryableStoreTypes.keyValueStore())); } public void start() @@ -93,7 +120,7 @@ public class PopularStreamProcessor return new JsonSerde<>(Word.class); } - public static JsonSerde outKeySerde() + public static JsonSerde outKeySerde() { return serde(true); } @@ -114,7 +141,7 @@ public class PopularStreamProcessor private static String typeMappingsConfig() { - return typeMappingsConfig(Word.class, WordCounter.class); + return typeMappingsConfig(WindowedWord.class, WordCounter.class); } public static String typeMappingsConfig(Class wordClass, Class wordCounterClass) diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/UserWord.java b/src/main/java/de/juplo/kafka/wordcount/popular/UserWord.java new file mode 100644 index 0000000..18f4726 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/UserWord.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.wordcount.popular; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class UserWord +{ + private String user; + private String word; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/WindowedWord.java b/src/main/java/de/juplo/kafka/wordcount/popular/WindowedWord.java new file mode 100644 index 0000000..df988fb --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/WindowedWord.java @@ -0,0 +1,18 @@ +package de.juplo.kafka.wordcount.popular; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.ZonedDateTime; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class WindowedWord +{ + ZonedDateTime start; + ZonedDateTime end; + String word; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/Word.java b/src/main/java/de/juplo/kafka/wordcount/popular/Word.java index 92c1d6d..1cf09f3 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/Word.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/Word.java @@ -1,13 +1,14 @@ package de.juplo.kafka.wordcount.popular; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; @Data -@JsonIgnoreProperties(ignoreUnknown = true) +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") public class Word { - private String user; - private String word; + String word; } diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java b/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java index 39eebf0..c94ec0e 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java @@ -1,6 +1,5 @@ package de.juplo.kafka.wordcount.popular; -import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -8,15 +7,9 @@ import lombok.NoArgsConstructor; @Data @NoArgsConstructor -@AllArgsConstructor(access = AccessLevel.PRIVATE) +@AllArgsConstructor(staticName = "of") public class WordCounter { - String user; String word; long counter; - - public static WordCounter of(Word word, long counter) - { - return new WordCounter(word.getUser(), word.getWord(), counter); - } } diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java index a327389..20df7ac 100644 --- a/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java @@ -7,6 +7,7 @@ import de.juplo.kafka.wordcount.stats.OutputWordCounter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -28,7 +29,8 @@ import java.time.Duration; import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_IN; import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_OUT; -import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.KEY_VALUE_STORE_NAME; +import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_STORE_NAME; import static org.awaitility.Awaitility.await; @@ -67,12 +69,11 @@ public class PopularApplicationIT @Autowired KafkaTemplate kafkaTemplate) { TestData - .getInputMessages() - .forEach(kv -> + .sendInputMessages((instant, kv) -> { try { - SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + SendResult result = kafkaTemplate.send(TOPIC_IN, null, instant.toEpochMilli(), kv.key, kv.value).get(); log.info( "Sent: {}={}, partition={}, offset={}", result.getProducerRecord().key(), @@ -109,7 +110,7 @@ public class PopularApplicationIT @DisplayName("Await the expected final output messages") @Test - public void testAwaitExpectedLastMessagesForUsers() + public void testAwaitExpectedLastMessagesForWord() { await("Expected final output messages") .atMost(Duration.ofSeconds(5)) @@ -157,9 +158,19 @@ public class PopularApplicationIT } @Bean - KeyValueBytesStoreSupplier storeSupplier() + WindowBytesStoreSupplier windowBytesStoreSupplier() { - return Stores.inMemoryKeyValueStore(STORE_NAME); + return Stores.inMemoryWindowStore( + WINDOW_STORE_NAME, + Duration.ofSeconds(60), + Duration.ofSeconds(30), + false); + } + + @Bean + KeyValueBytesStoreSupplier keyValueBytesStoreSupplier() + { + return Stores.inMemoryKeyValueStore(KEY_VALUE_STORE_NAME); } } } diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java index 3e71d3e..1d62bef 100644 --- a/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java @@ -20,10 +20,13 @@ import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; +import java.time.Duration; +import java.time.ZoneId; import java.util.Map; import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig; -import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.KEY_VALUE_STORE_NAME; +import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_STORE_NAME; @Slf4j @@ -31,6 +34,7 @@ public class PopularStreamProcessorTopologyTest { public static final String IN = "TEST-IN"; public static final String OUT = "TEST-OUT"; + public static final ZoneId ZONE = ZoneId.of("Europe/Berlin"); static TopologyTestDriver testDriver; @@ -43,7 +47,13 @@ public class PopularStreamProcessorTopologyTest Topology topology = PopularStreamProcessor.buildTopology( IN, OUT, - Stores.inMemoryKeyValueStore(STORE_NAME)); + ZONE, + Stores.inMemoryWindowStore( + WINDOW_STORE_NAME, + Duration.ofSeconds(60), + Duration.ofSeconds(30), + false), + Stores.inMemoryKeyValueStore(KEY_VALUE_STORE_NAME)); testDriver = new TopologyTestDriver(topology, serializationConfig()); @@ -52,9 +62,7 @@ public class PopularStreamProcessorTopologyTest TestOutputTopic out = testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer()); - TestData - .getInputMessages() - .forEach(kv -> in.pipeInput(kv.key, kv.value)); + TestData.sendInputMessages((instant, kv) -> in.pipeInput(kv.key, kv.value, instant)); receivedMessages = new LinkedMultiValueMap<>(); out @@ -88,7 +96,7 @@ public class PopularStreamProcessorTopologyTest @Test public void testExpectedState() { - KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + KeyValueStore store = testDriver.getKeyValueStore(KEY_VALUE_STORE_NAME); TestData.assertExpectedState(store); } diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java index 892ea81..2634f5e 100644 --- a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java @@ -4,18 +4,28 @@ import de.juplo.kafka.wordcount.splitter.InputUser; import de.juplo.kafka.wordcount.splitter.InputWord; import de.juplo.kafka.wordcount.stats.OutputWindowedWord; import de.juplo.kafka.wordcount.stats.OutputWordCounter; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; +import java.time.Clock; +import java.time.Instant; +import java.time.ZonedDateTime; +import java.util.function.BiConsumer; import java.util.stream.Stream; +import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_SIZE; import static org.assertj.core.api.Assertions.assertThat; +@Slf4j class TestData { + static final Clock CLOCK = Clock.fixed( + Clock.systemDefaultZone().instant(), + Clock.systemDefaultZone().getZone()); static final String PETER = "peter"; static final String KLAUS = "klaus"; @@ -25,54 +35,103 @@ class TestData static final String WORD_S = "s"; static final String WORD_BOÄH = "Boäh"; - static final OutputWindowedWord PETER_HALLO = OutputWindowedWord.of(PETER, WORD_HALLO); - static final OutputWindowedWord PETER_WELT = OutputWindowedWord.of(PETER, WORD_WELT); - static final OutputWindowedWord PETER_BOÄH = OutputWindowedWord.of(PETER, WORD_BOÄH); - static final OutputWindowedWord KLAUS_MÜSCH = OutputWindowedWord.of(KLAUS, WORD_MÜSCH); - static final OutputWindowedWord KLAUS_S = OutputWindowedWord.of(KLAUS, WORD_S); + static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_HALLO); + static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_WELT); + static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_BOÄH); + static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_MÜSCH); + static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_S); - private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + private static Instant windowStart() { - KeyValue.pair( + return windowBoundFor(CLOCK.instant(), 0); + } + + private static Instant windowEnd() + { + return windowBoundFor(CLOCK.instant(), WINDOW_SIZE.toSecondsPart()); + } + + private static Instant windowBoundFor(Instant instant, int second) + { + return instantOfSecond(second, 0); + } + + private static final TestMessage[] INPUT_MESSAGES = new TestMessage[] + { + TestMessage.of( + instantOfSecond(0), InputUser.of(PETER), InputWord.of(PETER, WORD_HALLO)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(13), InputUser.of(KLAUS), InputWord.of(KLAUS, WORD_MÜSCH)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(0), InputUser.of(PETER), InputWord.of(PETER, WORD_WELT)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(14), InputUser.of(KLAUS), InputWord.of(KLAUS, WORD_MÜSCH)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(14), InputUser.of(KLAUS), InputWord.of(KLAUS, WORD_S)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(14), InputUser.of(PETER), InputWord.of(PETER, WORD_BOÄH)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(14), InputUser.of(PETER), InputWord.of(PETER, WORD_WELT)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(15), InputUser.of(PETER), InputWord.of(PETER, WORD_BOÄH)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(15), InputUser.of(KLAUS), InputWord.of(KLAUS, WORD_S)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(29), InputUser.of(PETER), InputWord.of(PETER, WORD_BOÄH)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(20), InputUser.of(KLAUS), InputWord.of(KLAUS, WORD_S)), }; - static Stream> getInputMessages() + private static Instant instantOfSecond(int second) + { + return instantOfSecond(second, 0); + } + + private static Instant instantOfSecond(int second, int naonSeconds) + { + return ZonedDateTime + .ofInstant(CLOCK.instant(), CLOCK.getZone()) + .withSecond(second) + .withNano(naonSeconds) + .toInstant(); + } + + private static Stream> getInputMessages() { return Stream.of(TestData.INPUT_MESSAGES); } + static void sendInputMessages(BiConsumer> consumer) + { + getInputMessages().forEach(message -> + { + log.info("Sending@{}: {} -> {}", message.time, message.key, message.value); + consumer.accept(message.time, new KeyValue<>(message.key, message.value)); + }); + } + static void assertExpectedMessages(MultiValueMap receivedMessages) { expectedMessages().forEach( @@ -83,11 +142,11 @@ class TestData static void assertExpectedNumberOfMessagesForWord(MultiValueMap receivedMessages) { - assertThat(countMessagesForWord(PETER_HALLO, receivedMessages)); - assertThat(countMessagesForWord(PETER_WELT, receivedMessages)); - assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages)); - assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages)); - assertThat(countMessagesForWord(KLAUS_S, receivedMessages)); + assertThat(countMessagesForWord(WINDOWED_WORD_HALLO, receivedMessages)); + assertThat(countMessagesForWord(WINDOWED_WORD_WELT, receivedMessages)); + assertThat(countMessagesForWord(WINDOWED_WORD_BOÄH, receivedMessages)); + assertThat(countMessagesForWord(WINDOWED_WORD_MÜSCH, receivedMessages)); + assertThat(countMessagesForWord(WINDOWED_WORD_S, receivedMessages)); } private static int countMessagesForWord(OutputWindowedWord word, MultiValueMap messagesForUsers) @@ -97,43 +156,43 @@ class TestData : messagesForUsers.get(word).size(); } - static void assertExpectedState(ReadOnlyKeyValueStore store) + static void assertExpectedState(ReadOnlyKeyValueStore store) { - assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO))); - assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT))); - assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH))); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH))); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S))); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, store.get(windowedWordOf(WINDOWED_WORD_HALLO))); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, store.get(windowedWordOf(WINDOWED_WORD_WELT))); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, store.get(windowedWordOf(WINDOWED_WORD_BOÄH))); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, store.get(windowedWordOf(WINDOWED_WORD_MÜSCH))); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S))); } - private static Word wordOf(OutputWindowedWord testOutputWindowedWord) + private static WindowedWord windowedWordOf(OutputWindowedWord outputWindowedWord) { - Word word = new Word(); + WindowedWord windowedWord = new WindowedWord(); - word.setUser(testOutputWindowedWord.getUser()); - word.setWord(testOutputWindowedWord.getWord()); + windowedWord.setStart(ZonedDateTime.ofInstant(outputWindowedWord.getStart(), CLOCK.getZone())); + windowedWord.setEnd(ZonedDateTime.ofInstant(outputWindowedWord.getEnd(), CLOCK.getZone())); + windowedWord.setWord(outputWindowedWord.getWord()); - return word; + return windowedWord; } static void assertExpectedLastMessagesForWord(MultiValueMap receivedMessages) { - assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, getLastMessageFor(WINDOWED_WORD_HALLO, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, getLastMessageFor(WINDOWED_WORD_WELT, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, getLastMessageFor(WINDOWED_WORD_BOÄH, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, getLastMessageFor(WINDOWED_WORD_MÜSCH, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, getLastMessageFor(WINDOWED_WORD_S, receivedMessages)); } private static void assertWordCountEqualsWordCountFromLastMessage( OutputWindowedWord word, - Long counter) + WordCounter counter) { - OutputWordCounter testOutputWordCounter = OutputWordCounter.of( - word.getUser(), + OutputWordCounter outputWordCounter = OutputWordCounter.of( word.getWord(), - counter); - assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter); + counter.getCounter()); + assertWordCountEqualsWordCountFromLastMessage(word, outputWordCounter); } private static void assertWordCountEqualsWordCountFromLastMessage( @@ -161,38 +220,38 @@ class TestData private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( - PETER_HALLO, - OutputWordCounter.of(PETER, WORD_HALLO,1)), + WINDOWED_WORD_HALLO, + OutputWordCounter.of(WORD_HALLO,1)), KeyValue.pair( - KLAUS_MÜSCH, - OutputWordCounter.of(KLAUS, WORD_MÜSCH,1)), + WINDOWED_WORD_MÜSCH, + OutputWordCounter.of(WORD_MÜSCH,1)), KeyValue.pair( - PETER_WELT, - OutputWordCounter.of(PETER, WORD_WELT,1)), + WINDOWED_WORD_WELT, + OutputWordCounter.of(WORD_WELT,1)), KeyValue.pair( - KLAUS_MÜSCH, - OutputWordCounter.of(KLAUS, WORD_MÜSCH,2)), + WINDOWED_WORD_MÜSCH, + OutputWordCounter.of(WORD_MÜSCH,2)), KeyValue.pair( - KLAUS_S, - OutputWordCounter.of(KLAUS, WORD_S,1)), + WINDOWED_WORD_S, + OutputWordCounter.of(WORD_S,1)), KeyValue.pair( - PETER_BOÄH, - OutputWordCounter.of(PETER, WORD_BOÄH,1)), + WINDOWED_WORD_BOÄH, + OutputWordCounter.of(WORD_BOÄH,1)), KeyValue.pair( - PETER_WELT, - OutputWordCounter.of(PETER, WORD_WELT,2)), + WINDOWED_WORD_WELT, + OutputWordCounter.of(WORD_WELT,2)), KeyValue.pair( - PETER_BOÄH, - OutputWordCounter.of(PETER, WORD_BOÄH,2)), + WINDOWED_WORD_BOÄH, + OutputWordCounter.of(WORD_BOÄH,2)), KeyValue.pair( - KLAUS_S, - OutputWordCounter.of(KLAUS, WORD_S,2)), + WINDOWED_WORD_S, + OutputWordCounter.of(WORD_S,2)), KeyValue.pair( - PETER_BOÄH, - OutputWordCounter.of(PETER, WORD_BOÄH,3)), + WINDOWED_WORD_BOÄH, + OutputWordCounter.of(WORD_BOÄH,3)), KeyValue.pair( - KLAUS_S, - OutputWordCounter.of(KLAUS, WORD_S,3)), + WINDOWED_WORD_S, + OutputWordCounter.of(WORD_S,3)), }; static MultiValueMap expectedMessages() diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/TestMessage.java b/src/test/java/de/juplo/kafka/wordcount/popular/TestMessage.java new file mode 100644 index 0000000..0c777c9 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/popular/TestMessage.java @@ -0,0 +1,18 @@ +package de.juplo.kafka.wordcount.popular; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Instant; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestMessage +{ + Instant time; + K key; + V value; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java b/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java index 83288fa..1589ea0 100644 --- a/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java +++ b/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java @@ -4,12 +4,15 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import java.time.Instant; + @Data @NoArgsConstructor @AllArgsConstructor(staticName = "of") public class OutputWindowedWord { - String user; + Instant start; + Instant end; String word; } diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/OutputWordCounter.java b/src/test/java/de/juplo/kafka/wordcount/stats/OutputWordCounter.java index b5d2499..f132707 100644 --- a/src/test/java/de/juplo/kafka/wordcount/stats/OutputWordCounter.java +++ b/src/test/java/de/juplo/kafka/wordcount/stats/OutputWordCounter.java @@ -10,7 +10,6 @@ import lombok.NoArgsConstructor; @AllArgsConstructor(staticName = "of") public class OutputWordCounter { - String user; String word; long counter; }