From: Kai Moritz Date: Mon, 8 Jul 2024 07:40:54 +0000 (+0200) Subject: Remodeld example into a demonstration of a plain endless aggregation X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=b8dd5069d3d8eea74d476888f2221334fd25c946;p=demos%2Fkafka%2Fwordcount Remodeld example into a demonstration of a plain endless aggregation --- diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index fd0ae85..a72d38f 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -3,8 +3,9 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.EmitStrategy; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.test.TestRecord; import org.junit.jupiter.api.Test; @@ -12,7 +13,6 @@ import java.time.Duration; import java.time.Instant; import java.time.LocalTime; import java.time.ZoneId; -import java.time.temporal.ChronoField; import java.util.Properties; import java.util.stream.Stream; @@ -22,106 +22,101 @@ import static org.assertj.core.api.Assertions.assertThat; @Slf4j public class AggregationTopologyTest { + static final Instant T0 = Instant.ofEpochSecond(60); + + @Test public void test() { StreamsBuilder builder = new StreamsBuilder(); - KStream left = builder.stream(LEFT); - KStream right = builder.stream(RIGHT); + KStream input = builder.stream(INPUT); - left - .join( - right, - (valueLeft, valueRight) -> valueLeft + "-" + valueRight, - JoinWindows.ofTimeDifferenceAndGrace( - Duration.ofSeconds(10), - Duration.ofSeconds(5))) - .to(JOINED); + input + .peek((k,v) -> log.info("peek-0 -- {} = {}", k, v)) + .groupByKey() + .reduce((aggregate, value) -> aggregate + "-" + value) + .toStream() + .peek((k,v) -> log.info("peek-1 -- {} = {}", k, v)) + .to(OUTPUT); Topology topology = builder.build(); + log.info("Generated topology: {}", topology.describe()); Properties properties = new Properties(); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + properties.put(StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0l); TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties); - inLeft = testDriver.createInputTopic( - LEFT, - Serdes.String().serializer(), - Serdes.String().serializer()); - inRight = testDriver.createInputTopic( - RIGHT, + in = testDriver.createInputTopic( + INPUT, Serdes.String().serializer(), Serdes.String().serializer()); - outJoined = testDriver.createOutputTopic( - JOINED, + out = testDriver.createOutputTopic( + OUTPUT, Serdes.String().deserializer(), Serdes.String().deserializer()); - sendLeftAt("A", 3); - assertThatOutcomeIs(); + sendAt("A", 3); + assertThatOutcomeIs("A"); - sendRightAt("a", 4); - assertThatOutcomeIs("A-a"); + sendAt("B", 4); + assertThatOutcomeIs("A-B"); - sendLeftAt("B", 5); - assertThatOutcomeIs("B-a"); + sendAt("C", 5); + assertThatOutcomeIs("A-B-C"); - sendRightAt("b", 6); - assertThatOutcomeIs("A-b", "B-b"); + sendAt("D", 6); + assertThatOutcomeIs("A-B-C-D"); - sendLeftAt("C", 9); - assertThatOutcomeIs("C-a", "C-b"); + sendAt("E", 9); + assertThatOutcomeIs("A-B-C-D-E"); - sendRightAt("c", 10); - assertThatOutcomeIs("A-c", "B-c", "C-c"); + sendAt("F", 10); + assertThatOutcomeIs("A-B-C-D-E-F"); - sendRightAt("d", 14); - assertThatOutcomeIs("B-d", "C-d"); // ! + sendAt("G", 14); + assertThatOutcomeIs("A-B-C-D-E-F-G"); - sendLeftAt("D", 15); - assertThatOutcomeIs("D-b", "D-c", "D-d"); + sendAt("H", 15); + assertThatOutcomeIs("A-B-C-D-E-F-G-H"); - sendLeftAt("E", 40); - assertThatOutcomeIs(); + sendAt("I", 40); + assertThatOutcomeIs("A-B-C-D-E-F-G-H-I"); - sendLeftAt("F", 60); - assertThatOutcomeIs(); + sendAt("J", 60); + assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J"); - sendRightAt("f", 80); - assertThatOutcomeIs(); + sendAt("K", 80); + assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J-K"); - sendLeftAt("G", 100); - assertThatOutcomeIs(); + sendAt("L", 100); + assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J-K-L"); } - void sendLeftAt(String value, int second) - { - TestRecord record = new TestRecord<>("foo", value, T.plusSeconds(second)); - log.info( - "Sending LEFT at {}: {}", - LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), - record.value()); - inLeft.pipeInput(record); - } + static final String INPUT = "TEST-INPUT"; + static final String OUTPUT = "TEST-OUTPUT"; + + static final String KEY = "foo"; + - void sendRightAt(String value, int second) + TestInputTopic in; + TestOutputTopic out; + + + void sendAt(String value, int second) { - TestRecord record = new TestRecord<>("foo", value, T.plusSeconds(second)); + TestRecord record = new TestRecord<>(KEY, value, T0.plusSeconds(second)); log.info( - "Sending RIGHT at {}: {}", + "Sending at {} ({}): {}", + record.getRecordTime().toEpochMilli(), LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), record.value()); - inRight.pipeInput(record); - } - - TestRecord recordOf(String value, int second) - { - return new TestRecord<>("foo", value, T.plusSeconds(second)); + in.pipeInput(record); } void assertThatOutcomeIs(String... expected) @@ -131,25 +126,15 @@ public class AggregationTopologyTest Stream outcome() { - return outJoined + return out .readRecordsToList() .stream() .peek(record -> log.info( - "Receiving join for {}: {}", + "Received for {} ({}): {}={}", + record.getRecordTime().toEpochMilli(), LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), + record.key(), record.value())) .map(record -> record.value()); } - - - static final String LEFT = "TEST-LEFT"; - static final String RIGHT = "TEST-RIGHT"; - static final String JOINED = "TEST-JOINED"; - - static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0); - - - TestInputTopic inLeft; - TestInputTopic inRight; - TestOutputTopic outJoined; }