X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Frecorder%2FRecorderApplication.java;h=11248d8222ba96fec28d01ca43b53ad8a7749051;hb=refs%2Fheads%2Frecorder;hp=abe0685f2a08276be66f2fff8c1a206772bfddde;hpb=b185f168c230e65878ed4c99fc69229f356c41da;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java index abe0685..6702a70 100644 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java @@ -2,11 +2,11 @@ package de.juplo.kafka.wordcount.recorder; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.util.Assert; import java.util.Properties; @@ -17,14 +17,15 @@ import java.util.Properties; public class RecorderApplication { @Bean(destroyMethod = "close") - KafkaProducer producer(RecorderApplicationProperties properties) + KafkaProducer producer(RecorderApplicationProperties properties) { Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set"); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); return new KafkaProducer<>(props); }