Serdes.String().deserializer());
- inLeft.pipeInput(recordOf("A", 3));
+ sendLeftAt("A", 3);
assertThatOutcomeIs();
- inRight.pipeInput(recordOf("a", 4));
+ sendRightAt("a", 4);
assertThatOutcomeIs("A-a");
- inLeft.pipeInput(recordOf("B", 5));
+ sendLeftAt("B", 5);
assertThatOutcomeIs("B-a");
- inRight.pipeInput(recordOf("b", 6));
+ sendRightAt("b", 6);
assertThatOutcomeIs("A-b", "B-b");
- inLeft.pipeInput(recordOf("C", 9));
+ sendLeftAt("C", 9);
assertThatOutcomeIs("C-a", "C-b");
- inRight.pipeInput(recordOf("c", 10));
+ sendRightAt("c", 10);
assertThatOutcomeIs("A-c", "B-c", "C-c");
- inRight.pipeInput(recordOf("d", 14));
+ sendRightAt("d", 14);
assertThatOutcomeIs("B-d", "C-d"); // !
- inLeft.pipeInput(recordOf("D", 15));
+ sendLeftAt("D", 15);
assertThatOutcomeIs("D-b", "D-c", "D-d");
- inLeft.pipeInput(recordOf("E", 40));
+ sendLeftAt("E", 40);
assertThatOutcomeIs();
- inLeft.pipeInput(recordOf("F", 60));
+ sendLeftAt("F", 60);
assertThatOutcomeIs();
- inRight.pipeInput(recordOf("f", 80));
+ sendRightAt("f", 80);
assertThatOutcomeIs();
- inLeft.pipeInput(recordOf("G", 100));
+ sendLeftAt("G", 100);
assertThatOutcomeIs();
}
+ void sendLeftAt(String value, int second)
+ {
+ TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
+ log.info(
+ "Sending left: {} at time {}",
+ record.value(),
+ LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()));
+ inLeft.pipeInput(record);
+ }
+
+ void sendRightAt(String value, int second)
+ {
+ TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
+ log.info(
+ "Sending right: {} at time {}",
+ record.value(),
+ LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()));
+ inRight.pipeInput(record);
+ }
TestRecord<String, String> recordOf(String value, int second)
{