package de.juplo.kafka.wordcount.counter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.JoinWindows;
import java.time.Duration;
import java.time.Instant;
+import java.time.LocalTime;
+import java.time.ZoneId;
import java.time.temporal.ChronoField;
import java.util.Properties;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
+@Slf4j
public class StreamStreamJoinTopologyTest
{
- static final String LEFT = "TEST-LEFT";
- static final String RIGHT = "TEST-RIGHT";
- static final String JOINED = "TEST-JOINED";
-
- static TestInputTopic<String, String> inLeft;
- static TestInputTopic<String, String> inRight;
- static TestOutputTopic<String, String> outJoined;
-
- static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0);
-
- static final TestRecord<String, String> A = new TestRecord<>("foo", "A", T.minusSeconds(3));
- static final TestRecord<String, String> a = new TestRecord<>("foo", "a", T.minusSeconds(4));
- static final TestRecord<String, String> B = new TestRecord<>("foo", "B", T.minusSeconds(5));
- static final TestRecord<String, String> b = new TestRecord<>("foo", "b", T.minusSeconds(6));
- static final TestRecord<String, String> C = new TestRecord<>("foo", "C", T.minusSeconds(9));
- static final TestRecord<String, String> c = new TestRecord<>("foo", "c", T.minusSeconds(10));
- static final TestRecord<String, String> D = new TestRecord<>("foo", "D", T.minusSeconds(15));
- static final TestRecord<String, String> d = new TestRecord<>("foo", "d", T.minusSeconds(14));
- static final TestRecord<String, String> E = new TestRecord<>("foo", "E", T.minusSeconds(40));
- static final TestRecord<String, String> F = new TestRecord<>("foo", "F", T.minusSeconds(60));
- static final TestRecord<String, String> f = new TestRecord<>("foo", "f", T.minusSeconds(80));
- static final TestRecord<String, String> G = new TestRecord<>("foo", "G", T.minusSeconds(100));
+ TestRecord<String, String> A = recordOf("A", 3);
+ TestRecord<String, String> a = recordOf("a", 4);
+ TestRecord<String, String> B = recordOf("B", 5);
+ TestRecord<String, String> b = recordOf("b", 6);
+ TestRecord<String, String> C = recordOf("C", 9);
+ TestRecord<String, String> c = recordOf("c", 10);
+ TestRecord<String, String> D = recordOf("D", 15);
+ TestRecord<String, String> d = recordOf("d", 14);
+ TestRecord<String, String> E = recordOf("E", 40);
+ TestRecord<String, String> F = recordOf("F", 60);
+ TestRecord<String, String> f = recordOf("f", 80);
+ TestRecord<String, String> G = recordOf("G", 100);
@Test
assertThat(noResult());
}
+
+
boolean noResult()
{
return outJoined.isEmpty();
}
+ TestRecord<String, String> recordOf(String value, int second)
+ {
+ return new TestRecord<>("foo", value, T.minusSeconds(second));
+ }
+
Stream<String> outcome()
{
return outJoined
.readRecordsToList()
.stream()
+ .peek(record -> log.info(
+ "Received join-outcome: {} for time {}",
+ record.value(),
+ LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault())))
.map(record -> record.value());
}
+
+
+ static final String LEFT = "TEST-LEFT";
+ static final String RIGHT = "TEST-RIGHT";
+ static final String JOINED = "TEST-JOINED";
+
+ static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0);
+
+
+ TestInputTopic<String, String> inLeft;
+ TestInputTopic<String, String> inRight;
+ TestOutputTopic<String, String> outJoined;
}