From 55d9c923b04389ed5c6b9d6794587fa794384a5f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 26 Jun 2022 13:05:46 +0200 Subject: [PATCH] splitter: 1.0.0-vanilla-kafka - Factored out splitting logic --- .../wordcount/splitter/MessageSplitter.java | 17 +++++++++++++++++ .../splitter/SplitterStreamProcessor.java | 9 +++++---- 2 files changed, 22 insertions(+), 4 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/wordcount/splitter/MessageSplitter.java diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/MessageSplitter.java b/src/main/java/de/juplo/kafka/wordcount/splitter/MessageSplitter.java new file mode 100644 index 0000000..0665f3e --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/MessageSplitter.java @@ -0,0 +1,17 @@ +package de.juplo.kafka.wordcount.splitter; + +import org.springframework.stereotype.Component; + +import java.util.regex.Pattern; + + +@Component +public class MessageSplitter +{ + final static Pattern PATTERN = Pattern.compile("\\W+"); + + String[] split(String message) + { + return PATTERN.split(message); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java index 191428c..fabae8f 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -19,7 +19,6 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -27,8 +26,7 @@ import java.util.stream.Collectors; @Slf4j public class SplitterStreamProcessor implements Runnable { - final static Pattern PATTERN = Pattern.compile("\\W+"); - + private final MessageSplitter splitter; private final String inputTopic; private final String outputTopic; private final KafkaConsumer consumer; @@ -43,10 +41,13 @@ public class SplitterStreamProcessor implements Runnable private long lastCommit; public SplitterStreamProcessor( + MessageSplitter splitter, SplitterApplicationProperties properties, Clock clock, TaskExecutor executor) { + this.splitter = splitter; + this.inputTopic = properties.getInputTopic(); this.outputTopic = properties.getOutputTopic(); @@ -106,7 +107,7 @@ public class SplitterStreamProcessor implements Runnable offsets[inputRecord.partition()] = inputRecord.offset(); leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch(); - String[] words = PATTERN.split(inputRecord.value()); + String[] words = splitter.split(inputRecord.value()); for (int i = 0; i < words.length; i++) { ProducerRecord outputRecord = -- 2.20.1