#!/bin/bash
-IMAGE=juplo/spring-producer:1.0-SNAPSHOT
+IMAGE=juplo/spring-producer:1.0-kafkatemplate-SNAPSHOT
if [ "$1" = "cleanup" ]
then
}
group = 'de.juplo.kafka'
-version = '1.0-SNAPSHOT'
+version = '1.0-kafkatemplate-SNAPSHOT'
java {
toolchain {
}
dependencies {
- implementation 'org.apache.kafka:kafka-clients'
+ implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.boot:spring-boot-starter-web'
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
- testImplementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testCompileOnly 'org.projectlombok:lombok'
testAnnotationProcessor 'org.projectlombok:lombok'
- kafka-3
producer:
- image: juplo/spring-producer:1.0-SNAPSHOT
+ image: juplo/spring-producer:1.0-kafkatemplate-SNAPSHOT
environment:
- juplo.bootstrap-server: kafka:9092
- juplo.client-id: producer
+ spring.kafka.bootstrap-servers: kafka:9092
+ spring.kafka.client-id: producer
juplo.producer.topic: test
consumer:
<groupId>de.juplo.kafka</groupId>
<artifactId>spring-producer</artifactId>
<name>Spring Producer</name>
- <description>A Simple Producer, based on Spring Boot, that sends messages via Kafka</description>
- <version>1.0-SNAPSHOT</version>
+ <description>A Simple Producer, based on the KafkaTemplate and Spring Boot, that sends messages via Kafka</description>
+ <version>1.0-kafkatemplate-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
package de.juplo.kafka;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.KafkaTemplate;
import java.time.Duration;
-import java.util.Properties;
@Configuration
@Bean
public ExampleProducer exampleProducer(
ApplicationProperties properties,
- Producer<String, String> kafkaProducer,
+ KafkaProperties kafkaProperties,
+ KafkaTemplate<String, String> kafkaTemplate,
ConfigurableApplicationContext applicationContext)
{
return
new ExampleProducer(
- properties.getClientId(),
+ kafkaProperties.getClientId(),
properties.getProducerProperties().getTopic(),
properties.getProducerProperties().getThrottle() == null
? Duration.ofMillis(500)
: properties.getProducerProperties().getThrottle(),
- kafkaProducer,
+ kafkaTemplate,
() -> applicationContext.close());
}
-
- @Bean(destroyMethod = "")
- public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
- {
- Properties props = new Properties();
- props.put("bootstrap.servers", properties.getBootstrapServer());
- props.put("client.id", properties.getClientId());
- props.put("acks", properties.getProducerProperties().getAcks());
- props.put("delivery.timeout.ms", (int)properties.getProducerProperties().getDeliveryTimeout().toMillis());
- props.put("max.block.ms", (int)properties.getProducerProperties().getMaxBlock().toMillis());
- props.put("buffer.memory", properties.getProducerProperties().getBufferMemory());
- props.put("batch.size", properties.getProducerProperties().getBatchSize());
- props.put("metadata.max.age.ms", 5000); // 5 Sekunden
- props.put("request.timeout.ms", 5000); // 5 Sekunden
- props.put("linger.ms", properties.getProducerProperties().getLinger().toMillis());
- props.put("compression.type", properties.getProducerProperties().getCompressionType());
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
-
- return new KafkaProducer<>(props);
- }
}
@Setter
public class ApplicationProperties
{
- @NotNull
- @NotEmpty
- private String bootstrapServer;
- @NotNull
- @NotEmpty
- private String clientId;
-
@NotNull
private ProducerProperties producer;
@NotNull
@NotEmpty
private String topic;
- @NotNull
- @NotEmpty
- private String acks;
- @NotNull
- private Duration deliveryTimeout;
- @NotNull
- private Duration maxBlock;
- @NotNull
- private Long bufferMemory;
- @NotNull
- private Integer batchSize;
- @NotNull
- private Duration linger;
- @NotNull
- @NotEmpty
- private String compressionType;
private Duration throttle;
}
}
package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
@Slf4j
private final String id;
private final String topic;
private final Duration throttle;
- private final Producer<String, String> producer;
+ private final KafkaTemplate<String, String> kafkaTemplate;
private final Thread workerThread;
private final Runnable closeCallback;
String id,
String topic,
Duration throttle,
- Producer<String, String> producer,
+ KafkaTemplate<String, String> kafkaTemplate,
Runnable closeCallback)
{
this.id = id;
this.topic = topic;
this.throttle = throttle;
- this.producer = producer;
+ this.kafkaTemplate = kafkaTemplate;
workerThread = new Thread(this, "ExampleProducer Worker-Thread");
workerThread.start();
}
finally
{
- log.info("{}: Closing the KafkaProducer", id);
- producer.close();
log.info("{}: Produced {} messages in total, exiting!", id, produced);
}
}
{
final long time = System.currentTimeMillis();
- final ProducerRecord<String, String> record = new ProducerRecord<>(
- topic, // Topic
- key, // Key
- value // Value
- );
-
- producer.send(record, (metadata, e) ->
+ kafkaTemplate.send(topic, key, value).whenComplete((result, e) ->
{
long now = System.currentTimeMillis();
if (e == null)
{
// HANDLE SUCCESS
+ RecordMetadata metadata = result.getRecordMetadata();
log.debug(
"{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms",
id,
juplo:
- bootstrap-server: :9092
- client-id: DEV
producer:
topic: test
- acks: -1
- delivery-timeout: 10s
- max-block: 5s
- buffer-memory: 33554432
- batch-size: 16384
- linger: 0
- compression-type: gzip
throttle: 500
+spring:
+ kafka:
+ bootstrap-servers: :9092
+ client-id: DEV
+ producer:
+ acks: -1
+ buffer-memory: 33554432
+ batch-size: 16384
+ compression-type: gzip
+ properties:
+ delivery-timeout: 10s
+ max-block: 5s
+ linger: 0
management:
endpoint:
shutdown:
enabled: true
info:
kafka:
- bootstrap-server: ${juplo.bootstrap-server}
- client-id: ${juplo.client-id}
producer:
topic: ${juplo.producer.topic}
- acks: ${juplo.producer.acks}
- delivery-timeout: ${juplo.producer.delivery-timeout}
- max-block: ${juplo.producer.max-block}
- buffer-memory: ${juplo.producer.buffer-memory}
- batch-size: ${juplo.producer.batch-size}
- linger: ${juplo.producer.linger}
- compression-type: ${juplo.producer.compression-type}
throttle: ${juplo.producer.throttle}
logging:
level:
@SpringBootTest(
properties = {
- "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.auto-offset-reset=earliest",
- "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
"juplo.producer.topic=" + TOPIC})
@AutoConfigureMockMvc
@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)