#!/bin/bash
-IMAGE=juplo/spring-producer:1.0-SNAPSHOT
+IMAGE=juplo/spring-producer:2.0-SNAPSHOT
if [ "$1" = "cleanup" ]
then
- kafka-3
producer:
- image: juplo/spring-producer:1.0-SNAPSHOT
+ image: juplo/spring-producer:2.0-SNAPSHOT
environment:
- juplo.bootstrap-server: kafka:9092
- juplo.client-id: producer
+ spring.kafka.bootstrap-servers: kafka:9092
+ spring.kafka.client-id: producer
juplo.producer.topic: test
consumer:
<artifactId>spring-producer</artifactId>
<name>Spring Producer</name>
<description>A Simple Producer, based on Spring Boot, that sends messages via Kafka</description>
- <version>1.0-SNAPSHOT</version>
+ <version>2.0-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
package de.juplo.kafka;
-import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.ProducerFactory;
import java.time.Duration;
-import java.util.Properties;
@Configuration
{
@Bean
public ExampleProducer exampleProducer(
+ @Value("${spring.kafka.client-id}") String clientId,
ApplicationProperties properties,
Producer<String, String> kafkaProducer,
ConfigurableApplicationContext applicationContext)
{
return
new ExampleProducer(
- properties.getClientId(),
+ clientId,
properties.getProducerProperties().getTopic(),
properties.getProducerProperties().getThrottle() == null
? Duration.ofMillis(500)
}
@Bean(destroyMethod = "")
- public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+ public Producer<?, ?> kafkaProducer(ProducerFactory<?, ?> producerFactory)
{
- Properties props = new Properties();
- props.put("bootstrap.servers", properties.getBootstrapServer());
- props.put("client.id", properties.getClientId());
- props.put("acks", properties.getProducerProperties().getAcks());
- props.put("delivery.timeout.ms", (int)properties.getProducerProperties().getDeliveryTimeout().toMillis());
- props.put("max.block.ms", (int)properties.getProducerProperties().getMaxBlock().toMillis());
- props.put("buffer.memory", properties.getProducerProperties().getBufferMemory());
- props.put("batch.size", properties.getProducerProperties().getBatchSize());
- props.put("metadata.max.age.ms", 5000); // 5 Sekunden
- props.put("request.timeout.ms", 5000); // 5 Sekunden
- props.put("linger.ms", properties.getProducerProperties().getLinger().toMillis());
- props.put("compression.type", properties.getProducerProperties().getCompressionType());
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
-
- return new KafkaProducer<>(props);
+ return producerFactory.createProducer();
}
}
@Setter
public class ApplicationProperties
{
- @NotNull
- @NotEmpty
- private String bootstrapServer;
- @NotNull
- @NotEmpty
- private String clientId;
-
@NotNull
private ProducerProperties producer;
@NotNull
@NotEmpty
private String topic;
- @NotNull
- @NotEmpty
- private String acks;
- @NotNull
- private Duration deliveryTimeout;
- @NotNull
- private Duration maxBlock;
- @NotNull
- private Long bufferMemory;
- @NotNull
- private Integer batchSize;
- @NotNull
- private Duration linger;
- @NotNull
- @NotEmpty
- private String compressionType;
private Duration throttle;
}
}
juplo:
- bootstrap-server: :9092
- client-id: DEV
producer:
topic: test
- acks: -1
- delivery-timeout: 10s
- max-block: 5s
- buffer-memory: 33554432
- batch-size: 16384
- linger: 0
- compression-type: gzip
throttle: 500ms
+spring:
+ kafka:
+ bootstrap-servers: :9092
+ client-id: DEV
+ producer:
+ acks: -1
+ buffer-memory: 33554432
+ batch-size: 16384
+ compression-type: gzip
+ properties:
+ metadata.max.age.ms: 5000
+ request.timeout.ms: 5000
+ delivery.timeout.ms: 10000
+ max.block.ms: 5000
+ linger.ms: 0
management:
endpoint:
shutdown:
enabled: true
info:
kafka:
- bootstrap-server: ${juplo.bootstrap-server}
- client-id: ${juplo.client-id}
producer:
topic: ${juplo.producer.topic}
- acks: ${juplo.producer.acks}
- delivery-timeout: ${juplo.producer.delivery-timeout}
- max-block: ${juplo.producer.max-block}
- buffer-memory: ${juplo.producer.buffer-memory}
- batch-size: ${juplo.producer.batch-size}
- linger: ${juplo.producer.linger}
- compression-type: ${juplo.producer.compression-type}
throttle: ${juplo.producer.throttle}
logging:
level: