From 85e020f78fbfc81d55df746b0ed88dd8dedcfa2c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 6 Jul 2024 09:10:54 +0200 Subject: [PATCH] Added log-message, when sending a record --- .../counter/StreamStreamJoinTopologyTest.java | 43 +++++++++++++------ 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java index 3812e86..39f59b7 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java @@ -61,44 +61,63 @@ public class StreamStreamJoinTopologyTest Serdes.String().deserializer()); - inLeft.pipeInput(recordOf("A", 3)); + sendLeftAt("A", 3); assertThatOutcomeIs(); - inRight.pipeInput(recordOf("a", 4)); + sendRightAt("a", 4); assertThatOutcomeIs("A-a"); - inLeft.pipeInput(recordOf("B", 5)); + sendLeftAt("B", 5); assertThatOutcomeIs("B-a"); - inRight.pipeInput(recordOf("b", 6)); + sendRightAt("b", 6); assertThatOutcomeIs("A-b", "B-b"); - inLeft.pipeInput(recordOf("C", 9)); + sendLeftAt("C", 9); assertThatOutcomeIs("C-a", "C-b"); - inRight.pipeInput(recordOf("c", 10)); + sendRightAt("c", 10); assertThatOutcomeIs("A-c", "B-c", "C-c"); - inRight.pipeInput(recordOf("d", 14)); + sendRightAt("d", 14); assertThatOutcomeIs("B-d", "C-d"); // ! - inLeft.pipeInput(recordOf("D", 15)); + sendLeftAt("D", 15); assertThatOutcomeIs("D-b", "D-c", "D-d"); - inLeft.pipeInput(recordOf("E", 40)); + sendLeftAt("E", 40); assertThatOutcomeIs(); - inLeft.pipeInput(recordOf("F", 60)); + sendLeftAt("F", 60); assertThatOutcomeIs(); - inRight.pipeInput(recordOf("f", 80)); + sendRightAt("f", 80); assertThatOutcomeIs(); - inLeft.pipeInput(recordOf("G", 100)); + sendLeftAt("G", 100); assertThatOutcomeIs(); } + void sendLeftAt(String value, int second) + { + TestRecord record = new TestRecord<>("foo", value, T.plusSeconds(second)); + log.info( + "Sending left: {} at time {}", + record.value(), + LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault())); + inLeft.pipeInput(record); + } + + void sendRightAt(String value, int second) + { + TestRecord record = new TestRecord<>("foo", value, T.plusSeconds(second)); + log.info( + "Sending right: {} at time {}", + record.value(), + LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault())); + inRight.pipeInput(record); + } TestRecord recordOf(String value, int second) { -- 2.20.1