import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerde;
+import java.time.Duration;
+import java.time.ZoneId;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.KEY_VALUE_STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_STORE_NAME;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
public PopularStreamProcessor streamProcessor(
PopularApplicationProperties applicationProperties,
Properties streamProcessorProperties,
- KeyValueBytesStoreSupplier storeSupplier,
+ ZoneId zone,
+ KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
+ WindowBytesStoreSupplier windowBytesStoreSupplier,
ConfigurableApplicationContext context)
{
PopularStreamProcessor streamProcessor = new PopularStreamProcessor(
applicationProperties.getInputTopic(),
applicationProperties.getOutputTopic(),
streamProcessorProperties,
- storeSupplier);
+ zone,
+ windowBytesStoreSupplier,
+ keyValueBytesStoreSupplier);
streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
{
}
@Bean
- public KeyValueBytesStoreSupplier storeSupplier()
+ public ZoneId defaultZone()
{
- return Stores.persistentKeyValueStore(STORE_NAME);
+ return ZoneId.systemDefault();
+ }
+
+ @Bean
+ public WindowBytesStoreSupplier windowBytesStoreSupplier()
+ {
+ return Stores.persistentWindowStore(
+ KEY_VALUE_STORE_NAME,
+ Duration.ofSeconds(60),
+ Duration.ofSeconds(30),
+ true);
+ }
+
+ @Bean
+ public KeyValueBytesStoreSupplier keyValueBytesStoreSupplier()
+ {
+ return Stores.persistentKeyValueStore(WINDOW_STORE_NAME);
}
}
package de.juplo.kafka.wordcount.popular;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.support.serializer.JsonSerializer;
+import java.time.Duration;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
@Slf4j
public class PopularStreamProcessor
{
- public static final String STORE_NAME = "popular";
+ public static final String KEY_VALUE_STORE_NAME = "popular";
+ public static final String WINDOW_STORE_NAME = "popular-windows";
+ public static final Duration WINDOW_SIZE = Duration.ofSeconds(30);
public final KafkaStreams streams;
String inputTopic,
String outputTopic,
Properties properties,
- KeyValueBytesStoreSupplier storeSupplier)
+ ZoneId zone,
+ WindowBytesStoreSupplier windowBytesStoreSupplier,
+ KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
{
Topology topology = PopularStreamProcessor.buildTopology(
inputTopic,
outputTopic,
- storeSupplier);
+ zone,
+ windowBytesStoreSupplier,
+ keyValueBytesStoreSupplier);
streams = new KafkaStreams(topology, properties);
}
static Topology buildTopology(
String inputTopic,
String outputTopic,
- KeyValueBytesStoreSupplier storeSupplier)
+ ZoneId zone,
+ WindowBytesStoreSupplier windowBytesStoreSupplier,
+ KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
{
StreamsBuilder builder = new StreamsBuilder();
builder
.stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
- .map((key, word) -> new KeyValue<>(word, word))
+ .map((key, userWord) -> new KeyValue<>(Word.of(userWord.getWord()), Word.of(userWord.getWord())))
.groupByKey()
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
.count(
Materialized
- .<Word, Long>as(storeSupplier)
- .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed!
+ .<Word, Long>as(windowBytesStoreSupplier)
+ .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo())
+ .withValueSerde(Serdes.Long()))
+ .toStream()
+ .map((windowedWord, counter) -> new KeyValue<>(
+ WindowedWord.of(
+ ZonedDateTime.ofInstant(windowedWord.window().startTime(), zone),
+ ZonedDateTime.ofInstant(windowedWord.window().endTime(), zone),
+ windowedWord.key().getWord()),
+ WordCounter.of(windowedWord.key().getWord(), counter)))
+ .toTable(
+ Materialized
+ .<WindowedWord, WordCounter>as(keyValueBytesStoreSupplier)
+ .withKeySerde(new JsonSerde<>(WindowedWord.class).noTypeInfo())
+ .withValueSerde(new JsonSerde<>(WordCounter.class).noTypeInfo()))
.toStream()
- .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
.to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
Topology topology = builder.build();
return topology;
}
- ReadOnlyKeyValueStore<Word, Long> getStore()
+ ReadOnlyKeyValueStore<WindowedWord, WordCounter> getStore()
{
- return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+ return streams.store(StoreQueryParameters.fromNameAndType(KEY_VALUE_STORE_NAME, QueryableStoreTypes.keyValueStore()));
}
public void start()
return new JsonSerde<>(Word.class);
}
- public static JsonSerde<Word> outKeySerde()
+ public static JsonSerde<WindowedWord> outKeySerde()
{
return serde(true);
}
private static String typeMappingsConfig()
{
- return typeMappingsConfig(Word.class, WordCounter.class);
+ return typeMappingsConfig(WindowedWord.class, WordCounter.class);
}
public static String typeMappingsConfig(Class wordClass, Class wordCounterClass)
--- /dev/null
+package de.juplo.kafka.wordcount.popular;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class UserWord
+{
+ private String user;
+ private String word;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.popular;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.ZonedDateTime;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class WindowedWord
+{
+ ZonedDateTime start;
+ ZonedDateTime end;
+ String word;
+}
package de.juplo.kafka.wordcount.popular;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.NoArgsConstructor;
@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
public class Word
{
- private String user;
- private String word;
+ String word;
}
package de.juplo.kafka.wordcount.popular;
-import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
-@AllArgsConstructor(access = AccessLevel.PRIVATE)
+@AllArgsConstructor(staticName = "of")
public class WordCounter
{
- String user;
String word;
long counter;
-
- public static WordCounter of(Word word, long counter)
- {
- return new WordCounter(word.getUser(), word.getWord(), counter);
- }
}
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_IN;
import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_OUT;
-import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.KEY_VALUE_STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_STORE_NAME;
import static org.awaitility.Awaitility.await;
@Autowired KafkaTemplate<InputUser, InputWord> kafkaTemplate)
{
TestData
- .getInputMessages()
- .forEach(kv ->
+ .sendInputMessages((instant, kv) ->
{
try
{
- SendResult<InputUser, InputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+ SendResult<InputUser, InputWord> result = kafkaTemplate.send(TOPIC_IN, null, instant.toEpochMilli(), kv.key, kv.value).get();
log.info(
"Sent: {}={}, partition={}, offset={}",
result.getProducerRecord().key(),
@DisplayName("Await the expected final output messages")
@Test
- public void testAwaitExpectedLastMessagesForUsers()
+ public void testAwaitExpectedLastMessagesForWord()
{
await("Expected final output messages")
.atMost(Duration.ofSeconds(5))
}
@Bean
- KeyValueBytesStoreSupplier storeSupplier()
+ WindowBytesStoreSupplier windowBytesStoreSupplier()
{
- return Stores.inMemoryKeyValueStore(STORE_NAME);
+ return Stores.inMemoryWindowStore(
+ WINDOW_STORE_NAME,
+ Duration.ofSeconds(60),
+ Duration.ofSeconds(30),
+ false);
+ }
+
+ @Bean
+ KeyValueBytesStoreSupplier keyValueBytesStoreSupplier()
+ {
+ return Stores.inMemoryKeyValueStore(KEY_VALUE_STORE_NAME);
}
}
}
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
+import java.time.Duration;
+import java.time.ZoneId;
import java.util.Map;
import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig;
-import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.KEY_VALUE_STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_STORE_NAME;
@Slf4j
{
public static final String IN = "TEST-IN";
public static final String OUT = "TEST-OUT";
+ public static final ZoneId ZONE = ZoneId.of("Europe/Berlin");
static TopologyTestDriver testDriver;
Topology topology = PopularStreamProcessor.buildTopology(
IN,
OUT,
- Stores.inMemoryKeyValueStore(STORE_NAME));
+ ZONE,
+ Stores.inMemoryWindowStore(
+ WINDOW_STORE_NAME,
+ Duration.ofSeconds(60),
+ Duration.ofSeconds(30),
+ false),
+ Stores.inMemoryKeyValueStore(KEY_VALUE_STORE_NAME));
testDriver = new TopologyTestDriver(topology, serializationConfig());
TestOutputTopic<OutputWindowedWord, OutputWordCounter> out =
testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
- TestData
- .getInputMessages()
- .forEach(kv -> in.pipeInput(kv.key, kv.value));
+ TestData.sendInputMessages((instant, kv) -> in.pipeInput(kv.key, kv.value, instant));
receivedMessages = new LinkedMultiValueMap<>();
out
@Test
public void testExpectedState()
{
- KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
+ KeyValueStore<WindowedWord, WordCounter> store = testDriver.getKeyValueStore(KEY_VALUE_STORE_NAME);
TestData.assertExpectedState(store);
}
import de.juplo.kafka.wordcount.splitter.InputWord;
import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
import de.juplo.kafka.wordcount.stats.OutputWordCounter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.util.function.BiConsumer;
import java.util.stream.Stream;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_SIZE;
import static org.assertj.core.api.Assertions.assertThat;
+@Slf4j
class TestData
{
+ static final Clock CLOCK = Clock.fixed(
+ Clock.systemDefaultZone().instant(),
+ Clock.systemDefaultZone().getZone());
static final String PETER = "peter";
static final String KLAUS = "klaus";
static final String WORD_S = "s";
static final String WORD_BOÄH = "Boäh";
- static final OutputWindowedWord PETER_HALLO = OutputWindowedWord.of(PETER, WORD_HALLO);
- static final OutputWindowedWord PETER_WELT = OutputWindowedWord.of(PETER, WORD_WELT);
- static final OutputWindowedWord PETER_BOÄH = OutputWindowedWord.of(PETER, WORD_BOÄH);
- static final OutputWindowedWord KLAUS_MÜSCH = OutputWindowedWord.of(KLAUS, WORD_MÜSCH);
- static final OutputWindowedWord KLAUS_S = OutputWindowedWord.of(KLAUS, WORD_S);
+ static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_HALLO);
+ static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_WELT);
+ static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_BOÄH);
+ static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_MÜSCH);
+ static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_S);
- private static final KeyValue<InputUser, InputWord>[] INPUT_MESSAGES = new KeyValue[]
+ private static Instant windowStart()
{
- KeyValue.pair(
+ return windowBoundFor(CLOCK.instant(), 0);
+ }
+
+ private static Instant windowEnd()
+ {
+ return windowBoundFor(CLOCK.instant(), WINDOW_SIZE.toSecondsPart());
+ }
+
+ private static Instant windowBoundFor(Instant instant, int second)
+ {
+ return instantOfSecond(second, 0);
+ }
+
+ private static final TestMessage<InputUser, InputWord>[] INPUT_MESSAGES = new TestMessage[]
+ {
+ TestMessage.of(
+ instantOfSecond(0),
InputUser.of(PETER),
InputWord.of(PETER, WORD_HALLO)),
- KeyValue.pair(
+ TestMessage.of(
+ instantOfSecond(13),
InputUser.of(KLAUS),
InputWord.of(KLAUS, WORD_MÜSCH)),
- KeyValue.pair(
+ TestMessage.of(
+ instantOfSecond(0),
InputUser.of(PETER),
InputWord.of(PETER, WORD_WELT)),
- KeyValue.pair(
+ TestMessage.of(
+ instantOfSecond(14),
InputUser.of(KLAUS),
InputWord.of(KLAUS, WORD_MÜSCH)),
- KeyValue.pair(
+ TestMessage.of(
+ instantOfSecond(14),
InputUser.of(KLAUS),
InputWord.of(KLAUS, WORD_S)),
- KeyValue.pair(
+ TestMessage.of(
+ instantOfSecond(14),
InputUser.of(PETER),
InputWord.of(PETER, WORD_BOÄH)),
- KeyValue.pair(
+ TestMessage.of(
+ instantOfSecond(14),
InputUser.of(PETER),
InputWord.of(PETER, WORD_WELT)),
- KeyValue.pair(
+ TestMessage.of(
+ instantOfSecond(15),
InputUser.of(PETER),
InputWord.of(PETER, WORD_BOÄH)),
- KeyValue.pair(
+ TestMessage.of(
+ instantOfSecond(15),
InputUser.of(KLAUS),
InputWord.of(KLAUS, WORD_S)),
- KeyValue.pair(
+ TestMessage.of(
+ instantOfSecond(29),
InputUser.of(PETER),
InputWord.of(PETER, WORD_BOÄH)),
- KeyValue.pair(
+ TestMessage.of(
+ instantOfSecond(20),
InputUser.of(KLAUS),
InputWord.of(KLAUS, WORD_S)),
};
- static Stream<KeyValue<InputUser, InputWord>> getInputMessages()
+ private static Instant instantOfSecond(int second)
+ {
+ return instantOfSecond(second, 0);
+ }
+
+ private static Instant instantOfSecond(int second, int naonSeconds)
+ {
+ return ZonedDateTime
+ .ofInstant(CLOCK.instant(), CLOCK.getZone())
+ .withSecond(second)
+ .withNano(naonSeconds)
+ .toInstant();
+ }
+
+ private static Stream<TestMessage<InputUser, InputWord>> getInputMessages()
{
return Stream.of(TestData.INPUT_MESSAGES);
}
+ static void sendInputMessages(BiConsumer<Instant, KeyValue<InputUser, InputWord>> consumer)
+ {
+ getInputMessages().forEach(message ->
+ {
+ log.info("Sending@{}: {} -> {}", message.time, message.key, message.value);
+ consumer.accept(message.time, new KeyValue<>(message.key, message.value));
+ });
+ }
+
static void assertExpectedMessages(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
{
expectedMessages().forEach(
static void assertExpectedNumberOfMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
{
- assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
- assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
- assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages));
- assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages));
- assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
+ assertThat(countMessagesForWord(WINDOWED_WORD_HALLO, receivedMessages));
+ assertThat(countMessagesForWord(WINDOWED_WORD_WELT, receivedMessages));
+ assertThat(countMessagesForWord(WINDOWED_WORD_BOÄH, receivedMessages));
+ assertThat(countMessagesForWord(WINDOWED_WORD_MÜSCH, receivedMessages));
+ assertThat(countMessagesForWord(WINDOWED_WORD_S, receivedMessages));
}
private static int countMessagesForWord(OutputWindowedWord word, MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForUsers)
: messagesForUsers.get(word).size();
}
- static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
+ static void assertExpectedState(ReadOnlyKeyValueStore<WindowedWord, WordCounter> store)
{
- assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
- assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
- assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
- assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
- assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
+ assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, store.get(windowedWordOf(WINDOWED_WORD_HALLO)));
+ assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, store.get(windowedWordOf(WINDOWED_WORD_WELT)));
+ assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, store.get(windowedWordOf(WINDOWED_WORD_BOÄH)));
+ assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, store.get(windowedWordOf(WINDOWED_WORD_MÜSCH)));
+ assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S)));
}
- private static Word wordOf(OutputWindowedWord testOutputWindowedWord)
+ private static WindowedWord windowedWordOf(OutputWindowedWord outputWindowedWord)
{
- Word word = new Word();
+ WindowedWord windowedWord = new WindowedWord();
- word.setUser(testOutputWindowedWord.getUser());
- word.setWord(testOutputWindowedWord.getWord());
+ windowedWord.setStart(ZonedDateTime.ofInstant(outputWindowedWord.getStart(), CLOCK.getZone()));
+ windowedWord.setEnd(ZonedDateTime.ofInstant(outputWindowedWord.getEnd(), CLOCK.getZone()));
+ windowedWord.setWord(outputWindowedWord.getWord());
- return word;
+ return windowedWord;
}
static void assertExpectedLastMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
{
- assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
- assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
- assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
- assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
- assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
+ assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, getLastMessageFor(WINDOWED_WORD_HALLO, receivedMessages));
+ assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, getLastMessageFor(WINDOWED_WORD_WELT, receivedMessages));
+ assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, getLastMessageFor(WINDOWED_WORD_BOÄH, receivedMessages));
+ assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, getLastMessageFor(WINDOWED_WORD_MÜSCH, receivedMessages));
+ assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, getLastMessageFor(WINDOWED_WORD_S, receivedMessages));
}
private static void assertWordCountEqualsWordCountFromLastMessage(
OutputWindowedWord word,
- Long counter)
+ WordCounter counter)
{
- OutputWordCounter testOutputWordCounter = OutputWordCounter.of(
- word.getUser(),
+ OutputWordCounter outputWordCounter = OutputWordCounter.of(
word.getWord(),
- counter);
- assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
+ counter.getCounter());
+ assertWordCountEqualsWordCountFromLastMessage(word, outputWordCounter);
}
private static void assertWordCountEqualsWordCountFromLastMessage(
private static final KeyValue<OutputWindowedWord, OutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
{
KeyValue.pair(
- PETER_HALLO,
- OutputWordCounter.of(PETER, WORD_HALLO,1)),
+ WINDOWED_WORD_HALLO,
+ OutputWordCounter.of(WORD_HALLO,1)),
KeyValue.pair(
- KLAUS_MÜSCH,
- OutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
+ WINDOWED_WORD_MÜSCH,
+ OutputWordCounter.of(WORD_MÜSCH,1)),
KeyValue.pair(
- PETER_WELT,
- OutputWordCounter.of(PETER, WORD_WELT,1)),
+ WINDOWED_WORD_WELT,
+ OutputWordCounter.of(WORD_WELT,1)),
KeyValue.pair(
- KLAUS_MÜSCH,
- OutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
+ WINDOWED_WORD_MÜSCH,
+ OutputWordCounter.of(WORD_MÜSCH,2)),
KeyValue.pair(
- KLAUS_S,
- OutputWordCounter.of(KLAUS, WORD_S,1)),
+ WINDOWED_WORD_S,
+ OutputWordCounter.of(WORD_S,1)),
KeyValue.pair(
- PETER_BOÄH,
- OutputWordCounter.of(PETER, WORD_BOÄH,1)),
+ WINDOWED_WORD_BOÄH,
+ OutputWordCounter.of(WORD_BOÄH,1)),
KeyValue.pair(
- PETER_WELT,
- OutputWordCounter.of(PETER, WORD_WELT,2)),
+ WINDOWED_WORD_WELT,
+ OutputWordCounter.of(WORD_WELT,2)),
KeyValue.pair(
- PETER_BOÄH,
- OutputWordCounter.of(PETER, WORD_BOÄH,2)),
+ WINDOWED_WORD_BOÄH,
+ OutputWordCounter.of(WORD_BOÄH,2)),
KeyValue.pair(
- KLAUS_S,
- OutputWordCounter.of(KLAUS, WORD_S,2)),
+ WINDOWED_WORD_S,
+ OutputWordCounter.of(WORD_S,2)),
KeyValue.pair(
- PETER_BOÄH,
- OutputWordCounter.of(PETER, WORD_BOÄH,3)),
+ WINDOWED_WORD_BOÄH,
+ OutputWordCounter.of(WORD_BOÄH,3)),
KeyValue.pair(
- KLAUS_S,
- OutputWordCounter.of(KLAUS, WORD_S,3)),
+ WINDOWED_WORD_S,
+ OutputWordCounter.of(WORD_S,3)),
};
static MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages()
--- /dev/null
+package de.juplo.kafka.wordcount.popular;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Instant;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestMessage<K, V>
+{
+ Instant time;
+ K key;
+ V value;
+}
import lombok.Data;
import lombok.NoArgsConstructor;
+import java.time.Instant;
+
@Data
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
public class OutputWindowedWord
{
- String user;
+ Instant start;
+ Instant end;
String word;
}
@AllArgsConstructor(staticName = "of")
public class OutputWordCounter
{
- String user;
String word;
long counter;
}