1 package de.juplo.kafka.wordcount.splitter;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.KafkaStreams;
5 import org.apache.kafka.streams.StreamsBuilder;
6 import org.apache.kafka.streams.Topology;
7 import org.apache.kafka.streams.kstream.KStream;
9 import java.util.Arrays;
10 import java.util.Properties;
11 import java.util.regex.Pattern;
15 public class SplitterStreamProcessor
17 final static Pattern PATTERN = Pattern.compile("[^\\p{IsAlphabetic}]+");
19 public final KafkaStreams streams;
22 public SplitterStreamProcessor(
25 Properties properties)
27 Topology topology = buildTopology(inputTopic, outputTopic);
28 streams = new KafkaStreams(topology, properties);
31 static Topology buildTopology(
35 StreamsBuilder builder = new StreamsBuilder();
37 KStream<User, Recording> source = builder.stream(inputTopic);
40 .flatMapValues(recording -> Arrays
41 .stream(PATTERN.split(recording.getSentence()))
42 .map(word -> Word.of(recording.getUser(), word))
46 Topology topology = builder.build();
47 log.info("\n\n{}", topology.describe());
54 log.info("Starting Stream-Processor");
60 log.info("Stopping Stream-Processor");