From: Kai Moritz Date: Mon, 8 Jul 2024 16:36:42 +0000 (+0200) Subject: Cleaned up example X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=c17ad61b5fc80c775474d9e87c3296b3e135b6d9;p=demos%2Fkafka%2Fwordcount Cleaned up example --- diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index 13ab0ae..58a858a 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -36,7 +36,6 @@ public class AggregationTopologyTest .peek((k,v) -> log.info("peek-0 -- {} = {}", k, v)) .groupByKey() .windowedBy(WINDOWS) - .emitStrategy(EmitStrategy.onWindowUpdate()) .reduce((aggregate, value) -> aggregate + "-" + value) .toStream((k,v) -> k.toString()) .peek((k,v) -> log.info("peek-1 -- {} = {}", k, v)) @@ -48,7 +47,6 @@ public class AggregationTopologyTest Properties properties = new Properties(); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - properties.put(StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0l); TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);