WIP
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / users / UsersApplication.java
index 9ae44e1..98aa164 100644 (file)
@@ -1,5 +1,8 @@
 package de.juplo.kafka.wordcount.users;
 
+import de.juplo.kafka.wordcount.avro.User;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -17,14 +20,15 @@ import java.util.Properties;
 public class UsersApplication
 {
        @Bean(destroyMethod = "close")
-       KafkaProducer<String, String> producer(UsersApplicationProperties properties)
+       KafkaProducer<String, User> producer(UsersApplicationProperties properties)
        {
                Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set");
 
                Properties props = new Properties();
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-               props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+               props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class);
+               props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getSchemaRegistry());
 
                return new KafkaProducer<>(props);
        }