WIP
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / users / UsersApplication.java
1 package de.juplo.kafka.wordcount.users;
2
3 import de.juplo.kafka.wordcount.avro.User;
4 import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
5 import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerConfig;
8 import org.apache.kafka.common.serialization.StringSerializer;
9 import org.springframework.boot.SpringApplication;
10 import org.springframework.boot.autoconfigure.SpringBootApplication;
11 import org.springframework.boot.context.properties.EnableConfigurationProperties;
12 import org.springframework.context.annotation.Bean;
13 import org.springframework.util.Assert;
14
15 import java.util.Properties;
16
17
18 @SpringBootApplication
19 @EnableConfigurationProperties(UsersApplicationProperties.class)
20 public class UsersApplication
21 {
22         @Bean(destroyMethod = "close")
23         KafkaProducer<String, User> producer(UsersApplicationProperties properties)
24         {
25                 Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set");
26
27                 Properties props = new Properties();
28                 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
29                 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
30                 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class);
31                 props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getSchemaRegistry());
32
33                 return new KafkaProducer<>(props);
34         }
35
36         public static void main(String[] args)
37         {
38                 SpringApplication.run(UsersApplication.class, args);
39         }
40 }