projects
/
demos
/
kafka
/
wordcount
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
counter: 1.4.2 - RocksDB does nor work in Alpine-Linux
[demos/kafka/wordcount]
/
src
/
main
/
java
/
de
/
juplo
/
kafka
/
wordcount
/
counter
/
CounterStreamProcessor.java
diff --git
a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
index
c983a25
..
455d895
100644
(file)
--- a/
src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
+++ b/
src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
@@
-19,6
+19,7
@@
import java.util.stream.Collectors;
@Slf4j
public class CounterStreamProcessor
{
@Slf4j
public class CounterStreamProcessor
{
+ public static final String TYPE = "COUNTER";
public static final String STORE_NAME = "counter";
public static final String STORE_NAME = "counter";
@@
-48,6
+49,7
@@
public class CounterStreamProcessor
builder
.stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
builder
.stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
+ .mapValues(word -> Word.of(word.getUser(), word.getWord()))
.map((key, word) -> new KeyValue<>(word, word))
.groupByKey()
.count(
.map((key, word) -> new KeyValue<>(word, word))
.groupByKey()
.count(
@@
-88,9
+90,9
@@
public class CounterStreamProcessor
return new JsonSerde<>(User.class);
}
return new JsonSerde<>(User.class);
}
- public static JsonSerde<Word> inValueSerde()
+ public static JsonSerde<
User
Word> inValueSerde()
{
{
- return new JsonSerde<>(Word.class);
+ return new JsonSerde<>(
User
Word.class);
}
public static JsonSerde<Word> outKeySerde()
}
public static JsonSerde<Word> outKeySerde()
@@
-114,17
+116,17
@@
public class CounterStreamProcessor
private static String typeMappingsConfig()
{
private static String typeMappingsConfig()
{
- return typeMappings()
- .entrySet()
- .stream()
- .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
- .collect(Collectors.joining(","));
+ return typeMappingsConfig(Word.class, WordCounter.class);
}
}
- p
rivate static Map<String, Class> typeMappings(
)
+ p
ublic static String typeMappingsConfig(Class keyClass, Class counterClass
)
{
return Map.of(
{
return Map.of(
- "word", Word.class,
- "counter", WordCounter.class);
+ "key", keyClass,
+ "counter", counterClass)
+ .entrySet()
+ .stream()
+ .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
+ .collect(Collectors.joining(","));
}
}
}
}