@Slf4j
public class CounterStreamProcessor
{
+ public static final String TYPE = "COUNTER";
public static final String STORE_NAME = "counter";
StreamsBuilder builder = new StreamsBuilder();
builder
- .stream(
- inputTopic,
- Consumed.with(
- new JsonSerde<>(User.class),
- new JsonSerde<>(Word.class)))
+ .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
+ .mapValues(word -> Word.of(word.getUser(), word.getWord()))
.map((key, word) -> new KeyValue<>(word, word))
.groupByKey()
.count(
Materialized
.<Word, Long>as(storeSupplier)
- .withKeySerde(new JsonSerde<>().copyWithType(Word.class).forKeys()))
+ .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed!
.toStream()
.map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
.to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
+ public static JsonSerde<User> inKeySerde()
+ {
+ return new JsonSerde<>(User.class);
+ }
+
+ public static JsonSerde<UserWord> inValueSerde()
+ {
+ return new JsonSerde<>(UserWord.class);
+ }
+
public static JsonSerde<Word> outKeySerde()
{
return serde(true);
private static String typeMappingsConfig()
{
- return typeMappings()
- .entrySet()
- .stream()
- .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
- .collect(Collectors.joining(","));
+ return typeMappingsConfig(Word.class, WordCounter.class);
}
- private static Map<String, Class> typeMappings()
+ public static String typeMappingsConfig(Class keyClass, Class counterClass)
{
return Map.of(
- "word", Word.class,
- "counter", WordCounter.class);
+ "key", keyClass,
+ "counter", counterClass)
+ .entrySet()
+ .stream()
+ .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
+ .collect(Collectors.joining(","));
}
}