package de.juplo.kafka;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-
-import java.util.Properties;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.core.KafkaTemplate;
@Configuration
-@EnableConfigurationProperties(ApplicationProperties.class)
+@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
+@EnableKafka
public class ApplicationConfiguration
{
@Bean
public RestGateway restGateway(
- ApplicationProperties properties,
- KafkaProducer<String, Integer> kafkaProducer)
+ ApplicationProperties applicationProperties,
+ KafkaProperties kafkaProperties,
+ KafkaTemplate<String, Integer> kafkaTemplate)
{
return
new RestGateway(
- properties.getClientId(),
- properties.getTopic(),
- kafkaProducer);
- }
-
- @Bean(destroyMethod = "close")
- public KafkaProducer<String, Integer> kafkaProducer(ApplicationProperties properties)
- {
- Properties props = new Properties();
- props.put("bootstrap.servers", properties.getBootstrapServer());
- props.put("client.id", properties.getClientId());
- props.put("acks", properties.getAcks());
- props.put("batch.size", properties.getBatchSize());
- props.put("delivery.timeout.ms", 20000); // 20 Sekunden
- props.put("request.timeout.ms", 10000); // 10 Sekunden
- props.put("linger.ms", properties.getLingerMs());
- props.put("compression.type", properties.getCompressionType());
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", IntegerSerializer.class.getName());
-
- return new KafkaProducer<>(props);
+ kafkaProperties.getClientId(),
+ applicationProperties.getPartition(),
+ kafkaTemplate);
}
}