</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>popular</artifactId>
- <version>1.2.0</version>
+ <version>1.3.0</version>
<name>Wordcount-Popular-Words</name>
<description>Query stream-processor that gives access to the most popular words</description>
<properties>
import java.util.function.BiConsumer;
import java.util.stream.Stream;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.TYPE;
import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_SIZE;
import static org.assertj.core.api.Assertions.assertThat;
static final Clock CLOCK = Clock.fixed(
Clock.systemDefaultZone().instant(),
Clock.systemDefaultZone().getZone());
+
static final String PETER = "peter";
static final String KLAUS = "klaus";
static final String WORD_S = "s";
static final String WORD_BOÄH = "Boäh";
- static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), WORD_HALLO);
- static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), WORD_WELT);
- static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), WORD_BOÄH);
- static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), WORD_MÜSCH);
- static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), WORD_S);
+ static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(TYPE, windowStart(), WORD_HALLO);
+ static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(TYPE, windowStart(), WORD_WELT);
+ static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(TYPE, windowStart(), WORD_BOÄH);
+ static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(TYPE, windowStart(), WORD_MÜSCH);
+ static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(TYPE, windowStart(), WORD_S);
private static String windowStart()
{
{
WindowedWord windowedWord = new WindowedWord();
- windowedWord.setTime(outputWindowedWord.getTime());
+ windowedWord.setChannel(outputWindowedWord.getChannel());
windowedWord.setKey(outputWindowedWord.getKey());
return windowedWord;