#!/bin/bash
-IMAGE=juplo/spring-producer:1.0-json-SNAPSHOT
+IMAGE=juplo/spring-producer:1.0-messageconverter-SNAPSHOT
if [ "$1" = "cleanup" ]
then
}
group = 'de.juplo.kafka'
-version = '1.0-json-SNAPSHOT'
+version = '1.0-messageconverter-SNAPSHOT'
java {
toolchain {
}
dependencies {
+ implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.boot:spring-boot-starter-web'
- implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
- kafka-3
producer:
- image: juplo/spring-producer:1.0-json-SNAPSHOT
+ image: juplo/spring-producer:1.0-messageconverter-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: producer
<groupId>de.juplo.kafka</groupId>
<artifactId>spring-producer</artifactId>
<name>Spring Producer</name>
- <description>A Simple Producer, based on Spring Boot, that sends messages via Kafka</description>
- <version>1.0-json-SNAPSHOT</version>
+ <description>A Simple Producer, based on the KafkaTemplate and Spring Boot, that sends messages via Kafka</description>
+ <version>1.0-messageconverter-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
package de.juplo.kafka;
-import org.apache.kafka.clients.producer.Producer;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter;
+import org.springframework.kafka.support.converter.JsonMessageConverter;
+import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
+import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
@Configuration
public ExampleProducer exampleProducer(
@Value("${spring.kafka.client-id}") String clientId,
ApplicationProperties properties,
- Producer<String, SumupMessage> kafkaProducer,
+ KafkaProperties kafkaProperties,
+ KafkaTemplate<String, SumupMessage> kafkaTemplate,
ConfigurableApplicationContext applicationContext)
{
return
properties.getProducerProperties().getThrottle() == null
? Duration.ofMillis(500)
: properties.getProducerProperties().getThrottle(),
- kafkaProducer,
+ kafkaTemplate,
() -> applicationContext.close());
+
+ }
+
+ @Bean
+ public KafkaTemplate<String, SumupMessage> kafkaTemplate(
+ ProducerFactory<String, SumupMessage> producerFactory,
+ JsonMessageConverter jsonMessageConverter) {
+
+ KafkaTemplate<String, SumupMessage> template = new KafkaTemplate<>(producerFactory);
+ template.setMessageConverter(jsonMessageConverter);
+
+ return template;
}
- @Bean(destroyMethod = "")
- public Producer<?, ?> kafkaProducer(ProducerFactory<?, ?> producerFactory)
+ @Bean
+ public ByteArrayJsonMessageConverter jsonMessageConverter(ObjectMapper objectMapper)
{
- return producerFactory.createProducer();
+ ByteArrayJsonMessageConverter converter = new ByteArrayJsonMessageConverter();
+ DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
+
+ // Verwende eine einfache, kurze Type-ID anstatt FQN
+ typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
+ Map<String, Class<?>> typeMappings = new HashMap<>();
+ typeMappings.put("ADD", AddNumberMessage.class);
+ typeMappings.put("CALC", CalculateSumMessage.class);
+ typeMapper.setIdClassMapping(typeMappings);
+
+ converter.setTypeMapper(typeMapper);
+
+ return converter;
}
}
package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
import java.time.Duration;
private final String id;
private final String topic;
private final Duration throttle;
- private final Producer<String, SumupMessage> producer;
+ private final KafkaTemplate<String, SumupMessage> kafkaTemplate;
private final Thread workerThread;
private final Runnable closeCallback;
String id,
String topic,
Duration throttle,
- Producer<String, SumupMessage> producer,
+ KafkaTemplate<String, SumupMessage> kafkaTemplate,
Runnable closeCallback)
{
this.id = id;
this.topic = topic;
this.throttle = throttle;
- this.producer = producer;
+ this.kafkaTemplate = kafkaTemplate;
workerThread = new Thread(this, "ExampleProducer Worker-Thread");
workerThread.start();
}
finally
{
- log.info("{}: Closing the KafkaProducer", id);
- producer.close();
log.info("{}: Produced {} messages in total, exiting!", id, produced);
}
}
{
final long time = System.currentTimeMillis();
- final ProducerRecord<String, SumupMessage> record = new ProducerRecord<>(
- topic, // Topic
- key, // Key
- value // Value
- );
+ Message<SumupMessage> message = MessageBuilder
+ .withPayload(value)
+ .setHeader(KafkaHeaders.TOPIC, topic)
+ .build();
- producer.send(record, (metadata, e) ->
+ kafkaTemplate.send(message).whenComplete((result, e) ->
{
long now = System.currentTimeMillis();
if (e == null)
{
// HANDLE SUCCESS
+ RecordMetadata metadata = result.getRecordMetadata();
log.debug(
"{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms",
id,
buffer-memory: 33554432
batch-size: 16384
compression-type: gzip
- value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+ value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
properties:
- spring.json.type.mapping: >-
- ADD:de.juplo.kafka.AddNumberMessage,
- CALC:de.juplo.kafka.CalculateSumMessage
delivery-timeout: 10s
max-block: 5s
linger: 0
enabled: true
info:
kafka:
- bootstrap-server: ${juplo.bootstrap-server}
- client-id: ${juplo.client-id}
producer:
topic: ${juplo.producer.topic}
- acks: ${juplo.producer.acks}
- delivery-timeout: ${juplo.producer.delivery-timeout}
- max-block: ${juplo.producer.max-block}
- buffer-memory: ${juplo.producer.buffer-memory}
- batch-size: ${juplo.producer.batch-size}
- linger: ${juplo.producer.linger}
- compression-type: ${juplo.producer.compression-type}
throttle: ${juplo.producer.throttle}
logging:
level:
@SpringBootTest(
properties = {
- "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.auto-offset-reset=earliest",
"juplo.producer.topic=" + TOPIC})
@AutoConfigureMockMvc