1 package de.juplo.kafka.wordcount.splitter;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.KafkaStreams;
5 import org.apache.kafka.streams.StreamsBuilder;
6 import org.apache.kafka.streams.kstream.KStream;
8 import java.util.Arrays;
9 import java.util.Properties;
10 import java.util.regex.Pattern;
14 public class SplitterStreamProcessor
16 final static Pattern PATTERN = Pattern.compile("[^\\p{IsAlphabetic}]+");
18 public final KafkaStreams streams;
21 public SplitterStreamProcessor(
24 Properties properties)
26 StreamsBuilder builder = new StreamsBuilder();
28 KStream<String, Recording> source = builder.stream(inputTopic);
31 .flatMapValues(recording -> Arrays
32 .stream(PATTERN.split(recording.getSentence()))
33 .map(word -> Word.of(recording.getUser(), word))
37 streams = new KafkaStreams(builder.build(), properties);
42 log.info("Starting Stream-Processor");
48 log.info("Stopping Stream-Processor");