@Slf4j
public class CounterStreamProcessor
{
+ public static final String TYPE = "COUNTER";
public static final String STORE_NAME = "counter";
builder
.stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
+ .mapValues(word -> Word.of(word.getUser(), word.getWord()))
.map((key, word) -> new KeyValue<>(word, word))
.groupByKey()
.count(
return new JsonSerde<>(User.class);
}
- public static JsonSerde<Word> inValueSerde()
+ public static JsonSerde<UserWord> inValueSerde()
{
- return new JsonSerde<>(Word.class);
+ return new JsonSerde<>(UserWord.class);
}
public static JsonSerde<Word> outKeySerde()
return typeMappingsConfig(Word.class, WordCounter.class);
}
- public static String typeMappingsConfig(Class wordClass, Class wordCounterClass)
+ public static String typeMappingsConfig(Class keyClass, Class counterClass)
{
return Map.of(
- "word", wordClass,
- "counter", wordCounterClass)
+ "key", keyClass,
+ "counter", counterClass)
.entrySet()
.stream()
.map(entry -> entry.getKey() + ":" + entry.getValue().getName())