X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterStreamProcessor.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterStreamProcessor.java;h=fabae8f6d1ab4280ca60b9a4b951079ac28e030c;hb=55d9c923b04389ed5c6b9d6794587fa794384a5f;hp=191428cf9b77cc4dbac6e85271cef07a6dbb6db9;hpb=723ec6bbc03aaa4f552fd3a27986c120ea46fccf;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java index 191428c..fabae8f 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -19,7 +19,6 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -27,8 +26,7 @@ import java.util.stream.Collectors; @Slf4j public class SplitterStreamProcessor implements Runnable { - final static Pattern PATTERN = Pattern.compile("\\W+"); - + private final MessageSplitter splitter; private final String inputTopic; private final String outputTopic; private final KafkaConsumer consumer; @@ -43,10 +41,13 @@ public class SplitterStreamProcessor implements Runnable private long lastCommit; public SplitterStreamProcessor( + MessageSplitter splitter, SplitterApplicationProperties properties, Clock clock, TaskExecutor executor) { + this.splitter = splitter; + this.inputTopic = properties.getInputTopic(); this.outputTopic = properties.getOutputTopic(); @@ -106,7 +107,7 @@ public class SplitterStreamProcessor implements Runnable offsets[inputRecord.partition()] = inputRecord.offset(); leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch(); - String[] words = PATTERN.split(inputRecord.value()); + String[] words = splitter.split(inputRecord.value()); for (int i = 0; i < words.length; i++) { ProducerRecord outputRecord =