projects
/
demos
/
kafka
/
wordcount
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
splitter: 1.1.1 - The configured default-serde is used for serialization
[demos/kafka/wordcount]
/
src
/
main
/
java
/
de
/
juplo
/
kafka
/
wordcount
/
splitter
/
SplitterStreamProcessor.java
diff --git
a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
index
12816ab
..
5e4930d
100644
(file)
--- a/
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
+++ b/
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
@@
-41,8
+41,6
@@
public class SplitterStreamProcessor
JsonSerde<Recording> recordSerde =
new JsonSerde<>(Recording.class).ignoreTypeHeaders();
JsonSerde<Recording> recordSerde =
new JsonSerde<>(Recording.class).ignoreTypeHeaders();
- JsonSerde<Word> wordSerde =
- new JsonSerde<>(Word.class).noTypeInfo();
KStream<String, Recording> source = builder.stream(
properties.getInputTopic(),
KStream<String, Recording> source = builder.stream(
properties.getInputTopic(),
@@
-53,7
+51,7
@@
public class SplitterStreamProcessor
.stream(PATTERN.split(recording.getSentence()))
.map(word -> Word.of(recording.getUser(), word))
.toList())
.stream(PATTERN.split(recording.getSentence()))
.map(word -> Word.of(recording.getUser(), word))
.toList())
- .to(properties.getOutputTopic()
, Produced.with(Serdes.String(), wordSerde)
);
+ .to(properties.getOutputTopic());
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());