projects
/
demos
/
kafka
/
wordcount
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
723ec6b
)
splitter: 1.0.0-vanilla-kafka - Factored out splitting logic
author
Kai Moritz
<kai@juplo.de>
Sun, 26 Jun 2022 11:05:46 +0000
(13:05 +0200)
committer
Kai Moritz
<kai@juplo.de>
Thu, 30 Jun 2022 18:13:51 +0000
(20:13 +0200)
src/main/java/de/juplo/kafka/wordcount/splitter/MessageSplitter.java
[new file with mode: 0644]
patch
|
blob
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
patch
|
blob
|
history
diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/MessageSplitter.java
b/src/main/java/de/juplo/kafka/wordcount/splitter/MessageSplitter.java
new file mode 100644
(file)
index 0000000..
0665f3e
--- /dev/null
+++ b/
src/main/java/de/juplo/kafka/wordcount/splitter/MessageSplitter.java
@@ -0,0
+1,17
@@
+package de.juplo.kafka.wordcount.splitter;
+
+import org.springframework.stereotype.Component;
+
+import java.util.regex.Pattern;
+
+
+@Component
+public class MessageSplitter
+{
+ final static Pattern PATTERN = Pattern.compile("\\W+");
+
+ String[] split(String message)
+ {
+ return PATTERN.split(message);
+ }
+}
diff --git
a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
index
191428c
..
fabae8f
100644
(file)
--- a/
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
+++ b/
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
@@
-19,7
+19,6
@@
import java.time.Duration;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Collectors;
@@
-27,8
+26,7
@@
import java.util.stream.Collectors;
@Slf4j
public class SplitterStreamProcessor implements Runnable
{
@Slf4j
public class SplitterStreamProcessor implements Runnable
{
- final static Pattern PATTERN = Pattern.compile("\\W+");
-
+ private final MessageSplitter splitter;
private final String inputTopic;
private final String outputTopic;
private final KafkaConsumer<String, String> consumer;
private final String inputTopic;
private final String outputTopic;
private final KafkaConsumer<String, String> consumer;
@@
-43,10
+41,13
@@
public class SplitterStreamProcessor implements Runnable
private long lastCommit;
public SplitterStreamProcessor(
private long lastCommit;
public SplitterStreamProcessor(
+ MessageSplitter splitter,
SplitterApplicationProperties properties,
Clock clock,
TaskExecutor executor)
{
SplitterApplicationProperties properties,
Clock clock,
TaskExecutor executor)
{
+ this.splitter = splitter;
+
this.inputTopic = properties.getInputTopic();
this.outputTopic = properties.getOutputTopic();
this.inputTopic = properties.getInputTopic();
this.outputTopic = properties.getOutputTopic();
@@
-106,7
+107,7
@@
public class SplitterStreamProcessor implements Runnable
offsets[inputRecord.partition()] = inputRecord.offset();
leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
offsets[inputRecord.partition()] = inputRecord.offset();
leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
- String[] words =
PATTERN
.split(inputRecord.value());
+ String[] words =
splitter
.split(inputRecord.value());
for (int i = 0; i < words.length; i++)
{
ProducerRecord<String, String> outputRecord =
for (int i = 0; i < words.length; i++)
{
ProducerRecord<String, String> outputRecord =