* `WindowedWord` only stores the start-time of the window.
* `WindowedWord` stores the seconds-since-epoce as string.
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerde;
-import java.time.Duration;
import java.time.ZoneId;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
public PopularStreamProcessor streamProcessor(
PopularApplicationProperties applicationProperties,
Properties streamProcessorProperties,
- ZoneId zone,
KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
WindowBytesStoreSupplier windowBytesStoreSupplier,
ConfigurableApplicationContext context)
applicationProperties.getInputTopic(),
applicationProperties.getOutputTopic(),
streamProcessorProperties,
- zone,
windowBytesStoreSupplier,
keyValueBytesStoreSupplier);
return streamProcessor;
}
- @Bean
- public ZoneId defaultZone()
- {
- return ZoneId.systemDefault();
- }
-
@Bean
public WindowBytesStoreSupplier windowBytesStoreSupplier()
{
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.time.Duration;
-import java.time.ZoneId;
-import java.time.ZonedDateTime;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
String inputTopic,
String outputTopic,
Properties properties,
- ZoneId zone,
WindowBytesStoreSupplier windowBytesStoreSupplier,
KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
{
Topology topology = PopularStreamProcessor.buildTopology(
inputTopic,
outputTopic,
- zone,
windowBytesStoreSupplier,
keyValueBytesStoreSupplier);
static Topology buildTopology(
String inputTopic,
String outputTopic,
- ZoneId zone,
WindowBytesStoreSupplier windowBytesStoreSupplier,
KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
{
.peek((windowedWord, counter) -> log.info("windowed: {} -> {}", windowedWord, counter))
.map((windowedWord, counter) -> new KeyValue<>(
WindowedWord.of(
- ZonedDateTime.ofInstant(windowedWord.window().startTime(), zone),
- ZonedDateTime.ofInstant(windowedWord.window().endTime(), zone),
+ Long.toString(windowedWord.window().startTime().getEpochSecond()),
windowedWord.key().getWord()),
WordCounter.of(windowedWord.key().getWord(), counter)))
.peek((windowedWord, wordCounter) -> log.info("results: {} -> {}", windowedWord, wordCounter))
import lombok.Data;
import lombok.NoArgsConstructor;
-import java.time.ZonedDateTime;
-
@Data
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
public class WindowedWord
{
- ZonedDateTime start;
- ZonedDateTime end;
+ String time;
String key;
}
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
-import java.time.Duration;
-import java.time.ZoneId;
import java.util.Map;
import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig;
{
public static final String IN = "TEST-IN";
public static final String OUT = "TEST-OUT";
- public static final ZoneId ZONE = ZoneId.of("Europe/Berlin");
static TopologyTestDriver testDriver;
Topology topology = PopularStreamProcessor.buildTopology(
IN,
OUT,
- ZONE,
Stores.inMemoryWindowStore(
WINDOW_STORE_NAME,
WINDOW_SIZE.multipliedBy(2),
static final String WORD_S = "s";
static final String WORD_BOÄH = "Boäh";
- static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_HALLO);
- static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_WELT);
- static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_BOÄH);
- static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_MÜSCH);
- static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_S);
+ static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), WORD_HALLO);
+ static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), WORD_WELT);
+ static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), WORD_BOÄH);
+ static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), WORD_MÜSCH);
+ static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), WORD_S);
- private static Instant windowStart()
+ private static String windowStart()
{
- return windowBoundFor(0);
+ return toEpochSecond(windowBoundFor(0));
}
- private static Instant windowEnd()
+ private static String toEpochSecond(Instant instant)
{
- return windowBoundFor(WINDOW_SIZE.toSecondsPart());
+ return Long.toString(instant.getEpochSecond());
}
private static Instant windowBoundFor(int second)
{
WindowedWord windowedWord = new WindowedWord();
- windowedWord.setStart(ZonedDateTime.ofInstant(outputWindowedWord.getStart(), CLOCK.getZone()));
- windowedWord.setEnd(ZonedDateTime.ofInstant(outputWindowedWord.getEnd(), CLOCK.getZone()));
+ windowedWord.setTime(outputWindowedWord.getTime());
windowedWord.setKey(outputWindowedWord.getKey());
return windowedWord;
import lombok.Data;
import lombok.NoArgsConstructor;
-import java.time.Instant;
-
@Data
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
public class OutputWindowedWord
{
- Instant start;
- Instant end;
+ String time;
String key;
}