abe0685f2a08276be66f2fff8c1a206772bfddde
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / recorder / RecorderApplication.java
1 package de.juplo.kafka.wordcount.recorder;
2
3 import org.apache.kafka.clients.producer.KafkaProducer;
4 import org.apache.kafka.clients.producer.ProducerConfig;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.springframework.boot.SpringApplication;
7 import org.springframework.boot.autoconfigure.SpringBootApplication;
8 import org.springframework.boot.context.properties.EnableConfigurationProperties;
9 import org.springframework.context.annotation.Bean;
10 import org.springframework.util.Assert;
11
12 import java.util.Properties;
13
14
15 @SpringBootApplication
16 @EnableConfigurationProperties(RecorderApplicationProperties.class)
17 public class RecorderApplication
18 {
19         @Bean(destroyMethod = "close")
20         KafkaProducer<String, String> producer(RecorderApplicationProperties properties)
21         {
22                 Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set");
23
24                 Properties props = new Properties();
25                 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
26                 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
27                 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
28
29                 return new KafkaProducer<>(props);
30         }
31
32         public static void main(String[] args)
33         {
34                 SpringApplication.run(RecorderApplication.class, args);
35         }
36 }