.peek((k,v) -> log.info("peek-0 -- {} = {}", k, v))
.groupByKey()
.windowedBy(WINDOWS)
- .emitStrategy(EmitStrategy.onWindowUpdate())
.reduce((aggregate, value) -> aggregate + "-" + value)
.toStream((k,v) -> k.toString())
.peek((k,v) -> log.info("peek-1 -- {} = {}", k, v))
Properties properties = new Properties();
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
- properties.put(StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0l);
TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);