counter: 1.1.5 - Fixed a bug in the integration-test `CounterApplicationIT`
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessor.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.streams.KafkaStreams;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.StreamsBuilder;
9 import org.apache.kafka.streams.kstream.KStream;
10 import org.apache.kafka.streams.kstream.Materialized;
11 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
12
13 import java.util.Properties;
14
15
16 @Slf4j
17 public class CounterStreamProcessor
18 {
19         public final KafkaStreams streams;
20
21
22         public CounterStreamProcessor(
23                         String inputTopic,
24                         String outputTopic,
25                         Properties properties,
26                         KeyValueBytesStoreSupplier storeSupplier,
27                         ObjectMapper mapper)
28         {
29                 StreamsBuilder builder = new StreamsBuilder();
30
31                 KStream<String, String> source = builder.stream(inputTopic);
32                 source
33                                 .map((username, word) ->
34                                 {
35                                         try
36                                         {
37                                                 String key = mapper.writeValueAsString(Key.of(username, word));
38                                                 return new KeyValue<>(key, word);
39                                         }
40                                         catch (JsonProcessingException e)
41                                         {
42                                                 throw new RuntimeException(e);
43                                         }
44                                 })
45                                 .groupByKey()
46                                 .count(Materialized.as(storeSupplier))
47                                 .mapValues(value->Long.toString(value))
48                                 .toStream()
49                                 .to(outputTopic);
50
51                 streams = new KafkaStreams(builder.build(), properties);
52         }
53
54         public void start()
55         {
56                 log.info("Starting Stream-Processor");
57                 streams.start();
58         }
59
60         public void stop()
61         {
62                 log.info("Stopping Stream-Processor");
63                 streams.close();
64         }
65 }