1 package de.juplo.kafka.wordcount.counter;
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.common.serialization.Serdes;
6 import org.apache.kafka.streams.KafkaStreams;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.StreamsBuilder;
9 import org.apache.kafka.streams.Topology;
10 import org.apache.kafka.streams.kstream.KStream;
11 import org.apache.kafka.streams.kstream.Materialized;
12 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
13 import org.apache.kafka.streams.kstream.*;
14 import org.springframework.kafka.support.serializer.JsonSerde;
16 import java.util.Properties;
20 public class CounterStreamProcessor
22 public final KafkaStreams streams;
25 public CounterStreamProcessor(
28 Properties properties,
29 KeyValueBytesStoreSupplier storeSupplier,
32 Topology topology = CounterStreamProcessor.buildTopology(
38 streams = new KafkaStreams(topology, properties);
41 static Topology buildTopology(
44 KeyValueBytesStoreSupplier storeSupplier,
47 StreamsBuilder builder = new StreamsBuilder();
49 KStream<String, Word> source = builder.stream(
53 new JsonSerde<>(Word.class)
54 .ignoreTypeHeaders()));
57 .map((key, word) -> new KeyValue<>(word, word))
58 .groupByKey(Grouped.with(
59 new JsonSerde<>(Word.class)
62 new JsonSerde<>(Word.class)
65 .<Word,Long>as(storeSupplier)
67 new JsonSerde<>(Word.class)
73 .map((word, count) -> new KeyValue<>(word, WordCount.of(word.getUser(), word.getWord(), count)))
77 new JsonSerde<>(Word.class)
80 new JsonSerde<>(WordCount.class)
83 Topology topology = builder.build();
84 log.info("\n\n{}", topology.describe());
91 log.info("Starting Stream-Processor");
97 log.info("Stopping Stream-Processor");