From: Kai Moritz Date: Sat, 13 Jul 2024 09:14:43 +0000 (+0200) Subject: Aligned the join-example withe the aggregation-examples X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=60d4b1ea7a6b0e13cc0c4a16e31d6aa5a2d694ad;p=demos%2Fkafka%2Fwordcount Aligned the join-example withe the aggregation-examples --- diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/JoinTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/JoinTopologyTest.java index 9ca35bf..1b0b74b 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/JoinTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/JoinTopologyTest.java @@ -10,9 +10,6 @@ import org.junit.jupiter.api.Test; import java.time.Duration; import java.time.Instant; -import java.time.LocalTime; -import java.time.ZoneId; -import java.time.temporal.ChronoField; import java.util.Properties; import java.util.stream.Stream; @@ -22,6 +19,11 @@ import static org.assertj.core.api.Assertions.assertThat; @Slf4j public class JoinTopologyTest { + static final Duration WINDOW_SIZE = Duration.ofSeconds(10); + static final Duration GRACE_PERIOD = Duration.ofSeconds(5); + static final JoinWindows WINDOWS = JoinWindows.ofTimeDifferenceAndGrace(WINDOW_SIZE, GRACE_PERIOD); + + @Test public void test() { @@ -34,12 +36,11 @@ public class JoinTopologyTest .join( right, (valueLeft, valueRight) -> valueLeft + "-" + valueRight, - JoinWindows.ofTimeDifferenceAndGrace( - Duration.ofSeconds(10), - Duration.ofSeconds(5))) - .to(JOINED); + WINDOWS) + .to(OUTPUT); Topology topology = builder.build(); + log.info("Generated topology: {}", topology.describe()); Properties properties = new Properties(); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); @@ -55,75 +56,84 @@ public class JoinTopologyTest RIGHT, Serdes.String().serializer(), Serdes.String().serializer()); - outJoined = testDriver.createOutputTopic( - JOINED, + out = testDriver.createOutputTopic( + OUTPUT, Serdes.String().deserializer(), Serdes.String().deserializer()); - sendLeftAt("A", 3); + sendLeftAt("A", 63); assertThatOutcomeIs(); - sendRightAt("a", 4); + sendRightAt("a", 64); assertThatOutcomeIs("A-a"); - sendLeftAt("B", 5); + sendLeftAt("B", 65); assertThatOutcomeIs("B-a"); - sendRightAt("b", 6); + sendRightAt("b", 66); assertThatOutcomeIs("A-b", "B-b"); - sendLeftAt("C", 9); + sendLeftAt("C", 69); assertThatOutcomeIs("C-a", "C-b"); - sendRightAt("c", 10); + sendRightAt("c", 70); assertThatOutcomeIs("A-c", "B-c", "C-c"); - sendRightAt("d", 14); + sendRightAt("d", 74); assertThatOutcomeIs("B-d", "C-d"); // ! - sendLeftAt("D", 15); + sendLeftAt("D", 75); assertThatOutcomeIs("D-b", "D-c", "D-d"); - sendLeftAt("E", 40); + sendLeftAt("E", 100); assertThatOutcomeIs(); - sendLeftAt("F", 60); + sendLeftAt("F", 120); assertThatOutcomeIs(); - sendRightAt("f", 80); + sendRightAt("f", 140); assertThatOutcomeIs(); - sendLeftAt("G", 100); + sendLeftAt("G", 160); assertThatOutcomeIs(); } + static final String LEFT = "TEST-LEFT"; + static final String RIGHT = "TEST-RIGHT"; + static final String OUTPUT = "TEST-OUTPUT"; + + static final String KEY = "foo"; + + + TestInputTopic inLeft; + TestInputTopic inRight; + TestOutputTopic out; + + void sendLeftAt(String value, int second) { - TestRecord record = new TestRecord<>("foo", value, T.plusSeconds(second)); + TestRecord record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second)); log.info( - "Sending LEFT at {}: {}", - LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), + "Sending LEFT @ {}: {}={}", + record.getRecordTime().toEpochMilli(), + record.key(), record.value()); inLeft.pipeInput(record); } void sendRightAt(String value, int second) { - TestRecord record = new TestRecord<>("foo", value, T.plusSeconds(second)); + TestRecord record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second)); log.info( - "Sending RIGHT at {}: {}", - LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), + "Sending RIGHT @ {}: {}={}", + record.getRecordTime().toEpochMilli(), + record.key(), record.value()); inRight.pipeInput(record); } - TestRecord recordOf(String value, int second) - { - return new TestRecord<>("foo", value, T.plusSeconds(second)); - } - void assertThatOutcomeIs(String... expected) { assertThat(outcome()).containsExactly(expected); @@ -131,25 +141,14 @@ public class JoinTopologyTest Stream outcome() { - return outJoined + return out .readRecordsToList() .stream() .peek(record -> log.info( - "Receiving join for {}: {}", - LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), + "Received join @ {}: {}={}", + record.getRecordTime().toEpochMilli(), + record.key(), record.value())) .map(record -> record.value()); } - - - static final String LEFT = "TEST-LEFT"; - static final String RIGHT = "TEST-RIGHT"; - static final String JOINED = "TEST-JOINED"; - - static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0); - - - TestInputTopic inLeft; - TestInputTopic inRight; - TestOutputTopic outJoined; }