From d7bacb270dee43f795590236e22e36823daba4d7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 5 Jul 2024 23:06:32 +0200 Subject: [PATCH] Simplified demo-code: removed unnecessary keywords and performed DRY --- .../counter/StreamStreamJoinTopologyTest.java | 61 ++++++++++++------- 1 file changed, 39 insertions(+), 22 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java index 4c26e6c..8d178d5 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java @@ -1,5 +1,6 @@ package de.juplo.kafka.wordcount.counter; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.JoinWindows; @@ -9,6 +10,8 @@ import org.junit.jupiter.api.Test; import java.time.Duration; import java.time.Instant; +import java.time.LocalTime; +import java.time.ZoneId; import java.time.temporal.ChronoField; import java.util.Properties; import java.util.stream.Stream; @@ -16,30 +19,21 @@ import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; +@Slf4j public class StreamStreamJoinTopologyTest { - static final String LEFT = "TEST-LEFT"; - static final String RIGHT = "TEST-RIGHT"; - static final String JOINED = "TEST-JOINED"; - - static TestInputTopic inLeft; - static TestInputTopic inRight; - static TestOutputTopic outJoined; - - static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0); - - static final TestRecord A = new TestRecord<>("foo", "A", T.minusSeconds(3)); - static final TestRecord a = new TestRecord<>("foo", "a", T.minusSeconds(4)); - static final TestRecord B = new TestRecord<>("foo", "B", T.minusSeconds(5)); - static final TestRecord b = new TestRecord<>("foo", "b", T.minusSeconds(6)); - static final TestRecord C = new TestRecord<>("foo", "C", T.minusSeconds(9)); - static final TestRecord c = new TestRecord<>("foo", "c", T.minusSeconds(10)); - static final TestRecord D = new TestRecord<>("foo", "D", T.minusSeconds(15)); - static final TestRecord d = new TestRecord<>("foo", "d", T.minusSeconds(14)); - static final TestRecord E = new TestRecord<>("foo", "E", T.minusSeconds(40)); - static final TestRecord F = new TestRecord<>("foo", "F", T.minusSeconds(60)); - static final TestRecord f = new TestRecord<>("foo", "f", T.minusSeconds(80)); - static final TestRecord G = new TestRecord<>("foo", "G", T.minusSeconds(100)); + TestRecord A = recordOf("A", 3); + TestRecord a = recordOf("a", 4); + TestRecord B = recordOf("B", 5); + TestRecord b = recordOf("b", 6); + TestRecord C = recordOf("C", 9); + TestRecord c = recordOf("c", 10); + TestRecord D = recordOf("D", 15); + TestRecord d = recordOf("d", 14); + TestRecord E = recordOf("E", 40); + TestRecord F = recordOf("F", 60); + TestRecord f = recordOf("f", 80); + TestRecord G = recordOf("G", 100); @Test @@ -115,16 +109,39 @@ public class StreamStreamJoinTopologyTest assertThat(noResult()); } + + boolean noResult() { return outJoined.isEmpty(); } + TestRecord recordOf(String value, int second) + { + return new TestRecord<>("foo", value, T.minusSeconds(second)); + } + Stream outcome() { return outJoined .readRecordsToList() .stream() + .peek(record -> log.info( + "Received join-outcome: {} for time {}", + record.value(), + LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()))) .map(record -> record.value()); } + + + static final String LEFT = "TEST-LEFT"; + static final String RIGHT = "TEST-RIGHT"; + static final String JOINED = "TEST-JOINED"; + + static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0); + + + TestInputTopic inLeft; + TestInputTopic inRight; + TestOutputTopic outJoined; } -- 2.20.1