package de.juplo.kafka;
import org.apache.kafka.clients.producer.Producer;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
{
@Bean
public ExampleProducer exampleProducer(
+ @Value("${spring.kafka.client-id}") String clientId,
ApplicationProperties properties,
Producer<String, String> kafkaProducer,
ConfigurableApplicationContext applicationContext)
{
return
new ExampleProducer(
- properties.getClientId(),
+ clientId,
properties.getProducerProperties().getTopic(),
properties.getProducerProperties().getThrottle() == null
? Duration.ofMillis(500)
@Setter
public class ApplicationProperties
{
- @NotNull
- @NotEmpty
- private String bootstrapServer;
- @NotNull
- @NotEmpty
- private String clientId;
-
@NotNull
private ProducerProperties producer;
@NotNull
@NotEmpty
private String topic;
- @NotNull
- @NotEmpty
- private String acks;
- @NotNull
- private Duration deliveryTimeout;
- @NotNull
- private Duration maxBlock;
- @NotNull
- private Long bufferMemory;
- @NotNull
- private Integer batchSize;
- @NotNull
- private Duration linger;
- @NotNull
- @NotEmpty
- private String compressionType;
private Duration throttle;
}
}