projects
/
demos
/
kafka
/
wordcount
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
splitter: 1.0.0-vanilla-kafka - Factored out splitting logic
[demos/kafka/wordcount]
/
src
/
main
/
java
/
de
/
juplo
/
kafka
/
wordcount
/
splitter
/
SplitterStreamProcessor.java
diff --git
a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
index
191428c
..
fabae8f
100644
(file)
--- a/
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
+++ b/
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
@@
-19,7
+19,6
@@
import java.time.Duration;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Collectors;
@@
-27,8
+26,7
@@
import java.util.stream.Collectors;
@Slf4j
public class SplitterStreamProcessor implements Runnable
{
@Slf4j
public class SplitterStreamProcessor implements Runnable
{
- final static Pattern PATTERN = Pattern.compile("\\W+");
-
+ private final MessageSplitter splitter;
private final String inputTopic;
private final String outputTopic;
private final KafkaConsumer<String, String> consumer;
private final String inputTopic;
private final String outputTopic;
private final KafkaConsumer<String, String> consumer;
@@
-43,10
+41,13
@@
public class SplitterStreamProcessor implements Runnable
private long lastCommit;
public SplitterStreamProcessor(
private long lastCommit;
public SplitterStreamProcessor(
+ MessageSplitter splitter,
SplitterApplicationProperties properties,
Clock clock,
TaskExecutor executor)
{
SplitterApplicationProperties properties,
Clock clock,
TaskExecutor executor)
{
+ this.splitter = splitter;
+
this.inputTopic = properties.getInputTopic();
this.outputTopic = properties.getOutputTopic();
this.inputTopic = properties.getInputTopic();
this.outputTopic = properties.getOutputTopic();
@@
-106,7
+107,7
@@
public class SplitterStreamProcessor implements Runnable
offsets[inputRecord.partition()] = inputRecord.offset();
leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
offsets[inputRecord.partition()] = inputRecord.offset();
leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
- String[] words =
PATTERN
.split(inputRecord.value());
+ String[] words =
splitter
.split(inputRecord.value());
for (int i = 0; i < words.length; i++)
{
ProducerRecord<String, String> outputRecord =
for (int i = 0; i < words.length; i++)
{
ProducerRecord<String, String> outputRecord =