propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- propertyMap.put(
- JsonDeserializer.TYPE_MAPPINGS,
- "user:" + User.class.getName() + "," +
- "word:" + Word.class.getName() + "," +
- "counter:" + WordCounter.class.getName());
+ propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName());
return propertyMap;
}
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.kafka.support.serializer.JsonSerde;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import java.util.Map;
import java.util.Properties;
+import java.util.stream.Collectors;
@Slf4j
.withKeySerde(new JsonSerde<>().copyWithType(Word.class).forKeys()))
.toStream()
.map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
- .to(outputTopic);
+ .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
Topology topology = builder.build();
log.info("\n\n{}", topology.describe());
log.info("Stopping Stream-Processor");
streams.close();
}
+
+
+
+ public static JsonSerde<Word> outKeySerde()
+ {
+ return serde(true);
+ }
+
+ public static JsonSerde<WordCounter> outValueSerde()
+ {
+ return serde(false);
+ }
+
+ public static <T> JsonSerde<T> serde(boolean isKey)
+ {
+ JsonSerde<T> serde = new JsonSerde<>();
+ serde.configure(
+ Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()),
+ isKey);
+ return serde;
+ }
+
+ private static String typeMappingsConfig()
+ {
+ return typeMappings()
+ .entrySet()
+ .stream()
+ .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
+ .collect(Collectors.joining(","));
+ }
+
+ private static Map<String, Class> typeMappings()
+ {
+ return Map.of(
+ "word", Word.class,
+ "counter", WordCounter.class);
+ }
}