return typeMappingsConfig(WindowedWord.class, WordCounter.class);
}
- public static String typeMappingsConfig(Class wordClass, Class wordCounterClass)
+ public static String typeMappingsConfig(Class keyClass, Class counterClass)
{
return Map.of(
- "word", wordClass,
- "counter", wordCounterClass)
+ "key", keyClass,
+ "counter", counterClass)
.entrySet()
.stream()
.map(entry -> entry.getKey() + ":" + entry.getValue().getName())
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
- "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.stats.OutputWindowedWord,counter:de.juplo.kafka.wordcount.stats.OutputWordCounter",
+ "spring.kafka.consumer.properties.spring.json.type.mapping=key:de.juplo.kafka.wordcount.stats.OutputWindowedWord,counter:de.juplo.kafka.wordcount.stats.OutputWordCounter",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"juplo.wordcount.popular.bootstrap-server=${spring.embedded.kafka.brokers}",