1 package de.juplo.kafka.wordcount.counter;
 
   3 import lombok.extern.slf4j.Slf4j;
 
   4 import org.apache.kafka.common.serialization.Serdes;
 
   5 import org.apache.kafka.common.utils.Bytes;
 
   6 import org.apache.kafka.streams.*;
 
   7 import org.apache.kafka.streams.kstream.*;
 
   8 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 
   9 import org.apache.kafka.streams.state.WindowStore;
 
  10 import org.apache.kafka.streams.test.TestRecord;
 
  11 import org.junit.jupiter.api.Test;
 
  13 import java.time.Duration;
 
  14 import java.time.Instant;
 
  15 import java.util.Properties;
 
  16 import java.util.regex.Matcher;
 
  17 import java.util.regex.Pattern;
 
  18 import java.util.stream.Stream;
 
  20 import static org.assertj.core.api.Assertions.assertThat;
 
  24 public class AggregationTopologyTest
 
  26   static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
 
  27   static final Duration ADVANCE = Duration.ofSeconds(3);
 
  28   static final TimeWindows WINDOWS = TimeWindows
 
  29       .ofSizeWithNoGrace(WINDOW_SIZE)
 
  36     StreamsBuilder builder = new StreamsBuilder();
 
  38     KStream<String, String> input = builder.stream(INPUT);
 
  44             (aggregate, value) -> aggregate + "-" + value,
 
  45             Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated-store")
 
  46                 .withKeySerde(Serdes.String())
 
  47                 .withValueSerde(Serdes.String()))
 
  48         .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
 
  49         .toStream((k,v) -> k.toString())
 
  52     Topology topology = builder.build();
 
  53     log.info("Generated topology: {}", topology.describe());
 
  55     Properties properties = new Properties();
 
  56     properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
 
  57     properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
 
  59     TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
 
  61     in = testDriver.createInputTopic(
 
  63         Serdes.String().serializer(),
 
  64         Serdes.String().serializer());
 
  65     out = testDriver.createOutputTopic(
 
  67         Serdes.String().deserializer(),
 
  68         Serdes.String().deserializer());
 
  72     assertThatOutcomeIs();
 
  75     assertThatOutcomeIs(KeyValue.pair(windowFor(54), "A"));
 
  78     assertThatOutcomeIs();
 
  81     assertThatOutcomeIs();
 
  84     assertThatOutcomeIs(KeyValue.pair(windowFor(57), "A-B-C-D"));
 
  87     assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E"));
 
  90     assertThatOutcomeIs(KeyValue.pair(windowFor(63), "A-B-C-D-E-F"));
 
  93     assertThatOutcomeIs();
 
  97         KeyValue.pair(windowFor(66), "D-E-F-G-H"),
 
  98         KeyValue.pair(windowFor(69), "E-F-G-H"),
 
  99         KeyValue.pair(windowFor(72), "G-H"),
 
 100         KeyValue.pair(windowFor(75), "H"));
 
 104         KeyValue.pair(windowFor(93), "I"),
 
 105         KeyValue.pair(windowFor(96), "I"),
 
 106         KeyValue.pair(windowFor(99), "I"));
 
 110         KeyValue.pair(windowFor(111), "J"),
 
 111         KeyValue.pair(windowFor(114), "J"),
 
 112         KeyValue.pair(windowFor(117), "J"),
 
 113         KeyValue.pair(windowFor(120), "J"));
 
 117         KeyValue.pair(windowFor(132), "K"),
 
 118         KeyValue.pair(windowFor(135), "K"),
 
 119         KeyValue.pair(windowFor(138), "K"));
 
 121     // Never received, if no newer message is send
 
 122     // KeyValue.pair(windowFor(153), "L")
 
 123     // KeyValue.pair(windowFor(156), "L")
 
 124     //KeyValue.pair(windowFor(159), "L")
 
 128   static final String INPUT = "TEST-INPUT";
 
 129   static final String OUTPUT = "TEST-OUTPUT";
 
 131   static final String KEY = "foo";
 
 134   TestInputTopic<String, String> in;
 
 135   TestOutputTopic<String, String> out;
 
 138   void sendAt(String value, int second)
 
 140     TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
 
 142         "Sending  @ {}: {} = {}",
 
 143         record.getRecordTime().toEpochMilli(),
 
 146     in.pipeInput(record);
 
 149   void assertThatOutcomeIs(KeyValue<Windowed<String>, String>... expected)
 
 151     assertThat(outcome()).containsExactly(expected);
 
 154   Stream<KeyValue<Windowed<String>, String>> outcome()
 
 159         .peek(record -> log.info(
 
 160             "Received @ {}: {} = {}",
 
 161             record.getRecordTime().toEpochMilli(),
 
 164         .map(record -> KeyValue.pair(parse(record.key()), record.value()));
 
 168   static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$");
 
 170   Windowed<String> parse(String serialized)
 
 172     Matcher matcher = PATTERN.matcher(serialized);
 
 174     if (!matcher.matches())
 
 176       throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern());
 
 179     String key = matcher.group(1);
 
 180     String start = matcher.group(2);
 
 181     String end = matcher.group(3);
 
 183     Window window = new TimeWindow(Long.parseLong(start), Long.parseLong(end));
 
 185     return new Windowed<>(key, window);
 
 188   Windowed<String> windowFor(int second)
 
 190     Instant startTime = Instant.ofEpochSecond(second);
 
 191     Instant endTime = startTime.plus(WINDOW_SIZE);
 
 192     TimeWindow window = new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
 
 193     return new Windowed<>(KEY, window);