import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.EmitStrategy;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.*;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.Test;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
@Slf4j
public class AggregationTopologyTest
{
+ static final TimeWindows WINDOWS = TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(10));
+
+
@Test
public void test()
{
input
.peek((k,v) -> log.info("peek-0 -- {} = {}", k, v))
.groupByKey()
+ .windowedBy(WINDOWS)
+ .emitStrategy(EmitStrategy.onWindowUpdate())
.reduce((aggregate, value) -> aggregate + "-" + value)
- .toStream()
+ .toStream((k,v) -> k.toString())
.peek((k,v) -> log.info("peek-1 -- {} = {}", k, v))
.to(OUTPUT);
sendAt("A", 63);
- assertThatOutcomeIs("A");
+ assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A"));
sendAt("B", 64);
- assertThatOutcomeIs("A-B");
+ assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B"));
sendAt("C", 65);
- assertThatOutcomeIs("A-B-C");
+ assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C"));
sendAt("D", 66);
- assertThatOutcomeIs("A-B-C-D");
+ assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D"));
sendAt("E", 69);
- assertThatOutcomeIs("A-B-C-D-E");
+ assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E"));
sendAt("F", 70);
- assertThatOutcomeIs("A-B-C-D-E-F");
+ assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F"));
sendAt("G", 74);
- assertThatOutcomeIs("A-B-C-D-E-F-G");
+ assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G"));
sendAt("H", 75);
- assertThatOutcomeIs("A-B-C-D-E-F-G-H");
+ assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G-H"));
sendAt("I", 100);
- assertThatOutcomeIs("A-B-C-D-E-F-G-H-I");
+ assertThatOutcomeIs(KeyValue.pair(windowFor(100), "I"));
sendAt("J", 120);
- assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J");
+ assertThatOutcomeIs(KeyValue.pair(windowFor(120), "J"));
sendAt("K", 140);
- assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J-K");
+ assertThatOutcomeIs(KeyValue.pair(windowFor(140), "K"));
sendAt("L", 160);
- assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J-K-L");
+ assertThatOutcomeIs(KeyValue.pair(windowFor(160), "L"));
}
in.pipeInput(record);
}
- void assertThatOutcomeIs(String... expected)
+ void assertThatOutcomeIs(KeyValue<Windowed<String>, String>... expected)
{
assertThat(outcome()).containsExactly(expected);
}
- Stream<String> outcome()
+ Stream<KeyValue<Windowed<String>, String>> outcome()
{
return out
.readRecordsToList()
LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
record.key(),
record.value()))
- .map(record -> record.value());
+ .map(record -> KeyValue.pair(parse(record.key()), record.value()));
+ }
+
+
+ static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$");
+
+ Windowed<String> parse(String serialized)
+ {
+ Matcher matcher = PATTERN.matcher(serialized);
+
+ if (!matcher.matches())
+ {
+ throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern());
+ }
+
+ String key = matcher.group(1);
+ String start = matcher.group(2);
+ String end = matcher.group(3);
+
+ Window window = new TimeWindow(Long.parseLong(start), Long.parseLong(end));
+
+ return new Windowed<>(key, window);
+ }
+
+ Windowed<String> windowFor(int second)
+ {
+ Instant time = Instant.ofEpochSecond(second);
+ long timestamp = time.toEpochMilli();
+ return new Windowed<>(KEY, WINDOWS.windowsFor(timestamp).values().stream().findFirst().get());
}
}