1 package de.juplo.kafka.wordcount.counter;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.*;
5 import org.apache.kafka.streams.kstream.Consumed;
6 import org.apache.kafka.streams.kstream.Materialized;
7 import org.apache.kafka.streams.kstream.Produced;
8 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
9 import org.apache.kafka.streams.state.QueryableStoreTypes;
10 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
11 import org.springframework.kafka.support.serializer.JsonSerde;
12 import org.springframework.kafka.support.serializer.JsonSerializer;
15 import java.util.Properties;
16 import java.util.stream.Collectors;
20 public class CounterStreamProcessor
22 public static final String TYPE = "COUNTER";
23 public static final String STORE_NAME = "counter";
26 public final KafkaStreams streams;
29 public CounterStreamProcessor(
32 Properties properties,
33 KeyValueBytesStoreSupplier storeSupplier)
35 Topology topology = CounterStreamProcessor.buildTopology(
40 streams = new KafkaStreams(topology, properties);
43 static Topology buildTopology(
46 KeyValueBytesStoreSupplier storeSupplier)
48 StreamsBuilder builder = new StreamsBuilder();
51 .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
52 .mapValues(word -> Word.of(word.getUser(), word.getWord()))
53 .map((key, word) -> new KeyValue<>(word, word))
57 .<Word, Long>as(storeSupplier)
58 .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed!
60 .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
61 .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
63 Topology topology = builder.build();
64 log.info("\n\n{}", topology.describe());
69 ReadOnlyKeyValueStore<Word, Long> getStore()
71 return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
76 log.info("Starting Stream-Processor");
82 log.info("Stopping Stream-Processor");
88 public static JsonSerde<User> inKeySerde()
90 return new JsonSerde<>(User.class);
93 public static JsonSerde<UserWord> inValueSerde()
95 return new JsonSerde<>(UserWord.class);
98 public static JsonSerde<Word> outKeySerde()
103 public static JsonSerde<WordCounter> outValueSerde()
108 public static <T> JsonSerde<T> serde(boolean isKey)
110 JsonSerde<T> serde = new JsonSerde<>();
112 Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()),
117 private static String typeMappingsConfig()
119 return typeMappingsConfig(Word.class, WordCounter.class);
122 public static String typeMappingsConfig(Class keyClass, Class counterClass)
126 "counter", counterClass)
129 .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
130 .collect(Collectors.joining(","));