1 package de.juplo.kafka.wordcount.users;
3 import de.juplo.kafka.wordcount.avro.User;
4 import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
5 import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerConfig;
8 import org.apache.kafka.common.serialization.StringSerializer;
9 import org.springframework.boot.SpringApplication;
10 import org.springframework.boot.autoconfigure.SpringBootApplication;
11 import org.springframework.boot.context.properties.EnableConfigurationProperties;
12 import org.springframework.context.annotation.Bean;
13 import org.springframework.util.Assert;
15 import java.util.Properties;
18 @SpringBootApplication
19 @EnableConfigurationProperties(UsersApplicationProperties.class)
20 public class UsersApplication
22 @Bean(destroyMethod = "close")
23 KafkaProducer<String, User> producer(UsersApplicationProperties properties)
25 Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set");
27 Properties props = new Properties();
28 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
29 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
30 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class);
31 props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getSchemaRegistry());
33 return new KafkaProducer<>(props);
36 public static void main(String[] args)
38 SpringApplication.run(UsersApplication.class, args);