X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fusers%2FUsersApplication.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fusers%2FUsersApplication.java;h=98aa16458e76bd674e3dcca9a7a4a703b86a2975;hb=36381bd0a4dea9bd10494cb4919de6c7fe5c4190;hp=9ae44e1fdefde706b2bc4d6519af00842174cc69;hpb=fc7b9dfe12a1401e7365d85ec723364750bdcd3d;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java index 9ae44e1..98aa164 100644 --- a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java @@ -1,5 +1,8 @@ package de.juplo.kafka.wordcount.users; +import de.juplo.kafka.wordcount.avro.User; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; @@ -17,14 +20,15 @@ import java.util.Properties; public class UsersApplication { @Bean(destroyMethod = "close") - KafkaProducer producer(UsersApplicationProperties properties) + KafkaProducer producer(UsersApplicationProperties properties) { Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set"); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getSchemaRegistry()); return new KafkaProducer<>(props); }