import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.Test;
import java.time.Instant;
import java.time.LocalTime;
import java.time.ZoneId;
-import java.time.temporal.ChronoField;
import java.util.Properties;
import java.util.stream.Stream;
@Slf4j
public class AggregationTopologyTest
{
+ static final Instant T0 = Instant.ofEpochSecond(60);
+
+
@Test
public void test()
{
StreamsBuilder builder = new StreamsBuilder();
- KStream<String, String> left = builder.stream(LEFT);
- KStream<String, String> right = builder.stream(RIGHT);
+ KStream<String, String> input = builder.stream(INPUT);
- left
- .join(
- right,
- (valueLeft, valueRight) -> valueLeft + "-" + valueRight,
- JoinWindows.ofTimeDifferenceAndGrace(
- Duration.ofSeconds(10),
- Duration.ofSeconds(5)))
- .to(JOINED);
+ input
+ .peek((k,v) -> log.info("peek-0 -- {} = {}", k, v))
+ .groupByKey()
+ .reduce((aggregate, value) -> aggregate + "-" + value)
+ .toStream()
+ .peek((k,v) -> log.info("peek-1 -- {} = {}", k, v))
+ .to(OUTPUT);
Topology topology = builder.build();
+ log.info("Generated topology: {}", topology.describe());
Properties properties = new Properties();
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+ properties.put(StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0l);
TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
- inLeft = testDriver.createInputTopic(
- LEFT,
- Serdes.String().serializer(),
- Serdes.String().serializer());
- inRight = testDriver.createInputTopic(
- RIGHT,
+ in = testDriver.createInputTopic(
+ INPUT,
Serdes.String().serializer(),
Serdes.String().serializer());
- outJoined = testDriver.createOutputTopic(
- JOINED,
+ out = testDriver.createOutputTopic(
+ OUTPUT,
Serdes.String().deserializer(),
Serdes.String().deserializer());
- sendLeftAt("A", 3);
- assertThatOutcomeIs();
+ sendAt("A", 3);
+ assertThatOutcomeIs("A");
- sendRightAt("a", 4);
- assertThatOutcomeIs("A-a");
+ sendAt("B", 4);
+ assertThatOutcomeIs("A-B");
- sendLeftAt("B", 5);
- assertThatOutcomeIs("B-a");
+ sendAt("C", 5);
+ assertThatOutcomeIs("A-B-C");
- sendRightAt("b", 6);
- assertThatOutcomeIs("A-b", "B-b");
+ sendAt("D", 6);
+ assertThatOutcomeIs("A-B-C-D");
- sendLeftAt("C", 9);
- assertThatOutcomeIs("C-a", "C-b");
+ sendAt("E", 9);
+ assertThatOutcomeIs("A-B-C-D-E");
- sendRightAt("c", 10);
- assertThatOutcomeIs("A-c", "B-c", "C-c");
+ sendAt("F", 10);
+ assertThatOutcomeIs("A-B-C-D-E-F");
- sendRightAt("d", 14);
- assertThatOutcomeIs("B-d", "C-d"); // !
+ sendAt("G", 14);
+ assertThatOutcomeIs("A-B-C-D-E-F-G");
- sendLeftAt("D", 15);
- assertThatOutcomeIs("D-b", "D-c", "D-d");
+ sendAt("H", 15);
+ assertThatOutcomeIs("A-B-C-D-E-F-G-H");
- sendLeftAt("E", 40);
- assertThatOutcomeIs();
+ sendAt("I", 40);
+ assertThatOutcomeIs("A-B-C-D-E-F-G-H-I");
- sendLeftAt("F", 60);
- assertThatOutcomeIs();
+ sendAt("J", 60);
+ assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J");
- sendRightAt("f", 80);
- assertThatOutcomeIs();
+ sendAt("K", 80);
+ assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J-K");
- sendLeftAt("G", 100);
- assertThatOutcomeIs();
+ sendAt("L", 100);
+ assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J-K-L");
}
- void sendLeftAt(String value, int second)
- {
- TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
- log.info(
- "Sending LEFT at {}: {}",
- LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
- record.value());
- inLeft.pipeInput(record);
- }
+ static final String INPUT = "TEST-INPUT";
+ static final String OUTPUT = "TEST-OUTPUT";
+
+ static final String KEY = "foo";
+
- void sendRightAt(String value, int second)
+ TestInputTopic<String, String> in;
+ TestOutputTopic<String, String> out;
+
+
+ void sendAt(String value, int second)
{
- TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
+ TestRecord<String, String> record = new TestRecord<>(KEY, value, T0.plusSeconds(second));
log.info(
- "Sending RIGHT at {}: {}",
+ "Sending at {} ({}): {}",
+ record.getRecordTime().toEpochMilli(),
LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
record.value());
- inRight.pipeInput(record);
- }
-
- TestRecord<String, String> recordOf(String value, int second)
- {
- return new TestRecord<>("foo", value, T.plusSeconds(second));
+ in.pipeInput(record);
}
void assertThatOutcomeIs(String... expected)
Stream<String> outcome()
{
- return outJoined
+ return out
.readRecordsToList()
.stream()
.peek(record -> log.info(
- "Receiving join for {}: {}",
+ "Received for {} ({}): {}={}",
+ record.getRecordTime().toEpochMilli(),
LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+ record.key(),
record.value()))
.map(record -> record.value());
}
-
-
- static final String LEFT = "TEST-LEFT";
- static final String RIGHT = "TEST-RIGHT";
- static final String JOINED = "TEST-JOINED";
-
- static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0);
-
-
- TestInputTopic<String, String> inLeft;
- TestInputTopic<String, String> inRight;
- TestOutputTopic<String, String> outJoined;
}