counter: 1.1.5 - Fixed a bug in the integration-test `CounterApplicationIT`
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessor.java
index a823e25..b19502d 100644 (file)
@@ -7,6 +7,8 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 
 import java.util.Properties;
 
@@ -21,6 +23,7 @@ public class CounterStreamProcessor
                        String inputTopic,
                        String outputTopic,
                        Properties properties,
+                       KeyValueBytesStoreSupplier storeSupplier,
                        ObjectMapper mapper)
        {
                StreamsBuilder builder = new StreamsBuilder();
@@ -40,7 +43,7 @@ public class CounterStreamProcessor
                                        }
                                })
                                .groupByKey()
-                               .count()
+                               .count(Materialized.as(storeSupplier))
                                .mapValues(value->Long.toString(value))
                                .toStream()
                                .to(outputTopic);