<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
package de.juplo.kafka;
-import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.ProducerFactory;
import java.time.Duration;
-import java.util.Properties;
@Configuration
{
@Bean
public ExampleProducer exampleProducer(
+ @Value("${spring.kafka.client-id}") String clientId,
ApplicationProperties properties,
Producer<String, String> kafkaProducer,
ConfigurableApplicationContext applicationContext)
{
return
new ExampleProducer(
- properties.getClientId(),
+ clientId,
properties.getProducerProperties().getTopic(),
properties.getProducerProperties().getThrottle() == null
? Duration.ofMillis(500)
}
@Bean(destroyMethod = "")
- public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+ public Producer<?, ?> kafkaProducer(ProducerFactory<?, ?> producerFactory)
{
- Properties props = new Properties();
- props.put("bootstrap.servers", properties.getBootstrapServer());
- props.put("client.id", properties.getClientId());
- props.put("acks", properties.getProducerProperties().getAcks());
- props.put("delivery.timeout.ms", (int)properties.getProducerProperties().getDeliveryTimeout().toMillis());
- props.put("max.block.ms", (int)properties.getProducerProperties().getMaxBlock().toMillis());
- props.put("buffer.memory", properties.getProducerProperties().getBufferMemory());
- props.put("batch.size", properties.getProducerProperties().getBatchSize());
- props.put("metadata.max.age.ms", 5000); // 5 Sekunden
- props.put("request.timeout.ms", 5000); // 5 Sekunden
- props.put("linger.ms", properties.getProducerProperties().getLinger().toMillis());
- props.put("compression.type", properties.getProducerProperties().getCompressionType());
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
-
- return new KafkaProducer<>(props);
+ return producerFactory.createProducer();
}
}
@Setter
public class ApplicationProperties
{
- @NotNull
- @NotEmpty
- private String bootstrapServer;
- @NotNull
- @NotEmpty
- private String clientId;
-
@NotNull
private ProducerProperties producer;
@NotNull
@NotEmpty
private String topic;
- @NotNull
- @NotEmpty
- private String acks;
- @NotNull
- private Duration deliveryTimeout;
- @NotNull
- private Duration maxBlock;
- @NotNull
- private Long bufferMemory;
- @NotNull
- private Integer batchSize;
- @NotNull
- private Duration linger;
- @NotNull
- @NotEmpty
- private String compressionType;
private Duration throttle;
}
}