import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import java.util.Properties;
String inputTopic,
String outputTopic,
Properties properties,
+ KeyValueBytesStoreSupplier storeSupplier,
ObjectMapper mapper)
{
StreamsBuilder builder = new StreamsBuilder();
}
})
.groupByKey()
- .count()
+ .count(Materialized.as(storeSupplier))
.mapValues(value->Long.toString(value))
.toStream()
.to(outputTopic);