recorder: 1.2.2 - RocksDB does nor work in Alpine-Linux
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / recorder / RecorderApplication.java
index 11248d8..699c671 100644 (file)
@@ -2,11 +2,11 @@ package de.juplo.kafka.wordcount.recorder;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.util.Assert;
 
 import java.util.Properties;
@@ -17,14 +17,15 @@ import java.util.Properties;
 public class RecorderApplication
 {
        @Bean(destroyMethod = "close")
-       KafkaProducer<String, Recording> producer(RecorderApplicationProperties properties)
+       KafkaProducer<User, Recording> producer(RecorderApplicationProperties properties)
        {
                Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set");
 
                Properties props = new Properties();
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
-               props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-               props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+               props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+               props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+               props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
 
                return new KafkaProducer<>(props);
        }