#!/bin/bash
-IMAGE=juplo/spring-producer:2.0-SNAPSHOT
+IMAGE=juplo/spring-producer:2.0-kafkatemplate-SNAPSHOT
if [ "$1" = "cleanup" ]
then
}
group = 'de.juplo.kafka'
-version = '2.0-SNAPSHOT'
+version = '2.0-kafkatemplate-SNAPSHOT'
java {
toolchain {
}
dependencies {
- implementation 'org.apache.kafka:kafka-clients'
+ implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.boot:spring-boot-starter-web'
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
- testImplementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testCompileOnly 'org.projectlombok:lombok'
testAnnotationProcessor 'org.projectlombok:lombok'
- kafka-3
producer:
- image: juplo/spring-producer:2.0-SNAPSHOT
+ image: juplo/spring-producer:2.0-kafkatemplate-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: producer
<groupId>de.juplo.kafka</groupId>
<artifactId>spring-producer</artifactId>
<name>Spring Producer</name>
- <description>A Simple Producer, based on Spring Boot, that sends messages via Kafka</description>
- <version>2.0-SNAPSHOT</version>
+ <description>A Simple Producer, based on the KafkaTemplate and Spring Boot, that sends messages via Kafka</description>
+ <version>2.0-kafkatemplate-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
package de.juplo.kafka;
-import org.apache.kafka.clients.producer.Producer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
import java.time.Duration;
public ExampleProducer exampleProducer(
@Value("${spring.kafka.client-id}") String clientId,
ApplicationProperties properties,
- Producer<String, String> kafkaProducer,
+ KafkaTemplate<String, String> kafkaTemplate,
ConfigurableApplicationContext applicationContext)
{
return
properties.getProducerProperties().getThrottle() == null
? Duration.ofMillis(500)
: properties.getProducerProperties().getThrottle(),
- kafkaProducer,
+ kafkaTemplate,
() -> applicationContext.close());
}
-
- @Bean(destroyMethod = "")
- public Producer<?, ?> kafkaProducer(ProducerFactory<?, ?> producerFactory)
- {
- return producerFactory.createProducer();
- }
}
package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.springframework.kafka.core.KafkaTemplate;
import java.time.Duration;
private final String id;
private final String topic;
private final Duration throttle;
- private final Producer<String, String> producer;
+ private final KafkaTemplate<String, String> kafkaTemplate;
private final Thread workerThread;
private final Runnable closeCallback;
String id,
String topic,
Duration throttle,
- Producer<String, String> producer,
+ KafkaTemplate<String, String> kafkaTemplate,
Runnable closeCallback)
{
this.id = id;
this.topic = topic;
this.throttle = throttle;
- this.producer = producer;
+ this.kafkaTemplate = kafkaTemplate;
workerThread = new Thread(this, "ExampleProducer Worker-Thread");
workerThread.start();
}
finally
{
- log.info("{}: Closing the KafkaProducer", id);
- producer.close();
log.info("{}: Produced {} messages in total, exiting!", id, produced);
}
}
{
final long time = System.currentTimeMillis();
- final ProducerRecord<String, String> record = new ProducerRecord<>(
- topic, // Topic
- key, // Key
- value // Value
- );
-
- producer.send(record, (metadata, e) ->
+ kafkaTemplate.send(topic, key, value).whenComplete((result, e) ->
{
long now = System.currentTimeMillis();
if (e == null)
{
// HANDLE SUCCESS
+ RecordMetadata metadata = result.getRecordMetadata();
log.debug(
"{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms",
id,
@SpringBootTest(
properties = {
- "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.auto-offset-reset=earliest",
"juplo.producer.topic=" + TOPIC})
@AutoConfigureMockMvc