@Slf4j
public class AggregationTopologyTest
{
- static final Instant T0 = Instant.ofEpochSecond(60);
-
-
@Test
public void test()
{
Serdes.String().deserializer());
- sendAt("A", 3);
+ sendAt("A", 63);
assertThatOutcomeIs("A");
- sendAt("B", 4);
+ sendAt("B", 64);
assertThatOutcomeIs("A-B");
- sendAt("C", 5);
+ sendAt("C", 65);
assertThatOutcomeIs("A-B-C");
- sendAt("D", 6);
+ sendAt("D", 66);
assertThatOutcomeIs("A-B-C-D");
- sendAt("E", 9);
+ sendAt("E", 69);
assertThatOutcomeIs("A-B-C-D-E");
- sendAt("F", 10);
+ sendAt("F", 70);
assertThatOutcomeIs("A-B-C-D-E-F");
- sendAt("G", 14);
+ sendAt("G", 74);
assertThatOutcomeIs("A-B-C-D-E-F-G");
- sendAt("H", 15);
+ sendAt("H", 75);
assertThatOutcomeIs("A-B-C-D-E-F-G-H");
- sendAt("I", 40);
+ sendAt("I", 100);
assertThatOutcomeIs("A-B-C-D-E-F-G-H-I");
- sendAt("J", 60);
+ sendAt("J", 120);
assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J");
- sendAt("K", 80);
+ sendAt("K", 140);
assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J-K");
- sendAt("L", 100);
+ sendAt("L", 160);
assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J-K-L");
}
void sendAt(String value, int second)
{
- TestRecord<String, String> record = new TestRecord<>(KEY, value, T0.plusSeconds(second));
+ TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
log.info(
"Sending at {} ({}): {}",
record.getRecordTime().toEpochMilli(),