splitter: 1.1.4 - Moved code for config/init into `SplitterApplication`
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterStreamProcessor.java
1 package de.juplo.kafka.wordcount.splitter;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.KafkaStreams;
5 import org.apache.kafka.streams.StreamsBuilder;
6 import org.apache.kafka.streams.kstream.KStream;
7
8 import java.util.Arrays;
9 import java.util.Properties;
10 import java.util.regex.Pattern;
11
12
13 @Slf4j
14 public class SplitterStreamProcessor
15 {
16         final static Pattern PATTERN = Pattern.compile("[^\\p{IsAlphabetic}]+");
17
18         public final KafkaStreams streams;
19
20
21         public SplitterStreamProcessor(
22                         String inputTopic,
23                         String outputTopic,
24                         Properties properties)
25         {
26                 StreamsBuilder builder = new StreamsBuilder();
27
28                 KStream<String, Recording> source = builder.stream(inputTopic);
29
30                 source
31                                 .flatMapValues(recording -> Arrays
32                                                         .stream(PATTERN.split(recording.getSentence()))
33                                                         .map(word -> Word.of(recording.getUser(), word))
34                                                         .toList())
35                                 .to(outputTopic);
36
37                 streams = new KafkaStreams(builder.build(), properties);
38         }
39
40         public void start()
41         {
42                 log.info("Starting Stream-Processor");
43                 streams.start();
44         }
45
46         public void stop()
47         {
48                 log.info("Stopping Stream-Processor");
49                 streams.close();
50         }
51 }