fa8466578a165da86df378f6859ecf557d66567e
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterStreamProcessor.java
1 package de.juplo.kafka.wordcount.splitter;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.KafkaStreams;
5 import org.apache.kafka.streams.StreamsBuilder;
6 import org.apache.kafka.streams.Topology;
7 import org.apache.kafka.streams.kstream.KStream;
8
9 import java.util.Arrays;
10 import java.util.Properties;
11 import java.util.regex.Pattern;
12
13
14 @Slf4j
15 public class SplitterStreamProcessor
16 {
17         final static Pattern PATTERN = Pattern.compile("[^\\p{IsAlphabetic}]+");
18
19         public final KafkaStreams streams;
20
21
22         public SplitterStreamProcessor(
23                         String inputTopic,
24                         String outputTopic,
25                         Properties properties)
26         {
27                 Topology topology = buildTopology(inputTopic, outputTopic);
28                 streams = new KafkaStreams(topology, properties);
29         }
30
31         static Topology buildTopology(
32                         String inputTopic,
33                         String outputTopic)
34         {
35                 StreamsBuilder builder = new StreamsBuilder();
36
37                 KStream<User, Recording> source = builder.stream(inputTopic);
38
39                 source
40                                 .flatMapValues(recording -> Arrays
41                                                         .stream(PATTERN.split(recording.getSentence()))
42                                                         .map(word -> Word.of(recording.getUser(), word))
43                                                         .toList())
44                                 .to(outputTopic);
45
46                 Topology topology = builder.build();
47                 log.info("\n\n{}", topology.describe());
48
49                 return topology;
50         }
51
52         public void start()
53         {
54                 log.info("Starting Stream-Processor");
55                 streams.start();
56         }
57
58         public void stop()
59         {
60                 log.info("Stopping Stream-Processor");
61                 streams.close();
62         }
63 }