From: Kai Moritz Date: Thu, 11 Jun 2026 15:39:11 +0000 (+0200) Subject: Serialisierung fachlicher Nachrichten mit dem `MessageConverter` X-Git-Tag: springkafka/spring-producer--messageconverter--2026-06-lvm X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=2573ed534eeeaf3037bb46b753b6aecf70a04731;p=demos%2Fkafka%2Ftraining Serialisierung fachlicher Nachrichten mit dem `MessageConverter` --- diff --git a/README.sh b/README.sh index 04f0a01b..b9d8b891 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,5 @@ #!/bin/bash - -IMAGE=juplo/spring-producer:2.0-kafkatemplate-SNAPSHOT +IMAGE=juplo/spring-producer:2.0-messageconverter-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index f0fb593e..947bbd34 100644 --- a/build.gradle +++ b/build.gradle @@ -6,7 +6,7 @@ plugins { } group = 'de.juplo.kafka' -version = '2.0-kafkatemplate-SNAPSHOT' +version = '2.0-messageconverter-SNAPSHOT' java { toolchain { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index bbb43524..27207168 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -173,7 +173,7 @@ services: - kafka-3 producer: - image: juplo/spring-producer:2.0-kafkatemplate-SNAPSHOT + image: juplo/spring-producer:2.0-messageconverter-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: producer diff --git a/pom.xml b/pom.xml index ec2f3f92..59131858 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-producer Spring Producer A Simple Producer, based on the KafkaTemplate and Spring Boot, that sends messages via Kafka - 2.0-kafkatemplate-SNAPSHOT + 2.0-messageconverter-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/AddNumberMessage.java b/src/main/java/de/juplo/kafka/AddNumberMessage.java new file mode 100644 index 00000000..deb6350e --- /dev/null +++ b/src/main/java/de/juplo/kafka/AddNumberMessage.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Value; + + +@Value +public class AddNumberMessage implements SumupMessage +{ + private final int number; + private final int next; +} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index f7ad6594..aac39623 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -6,8 +6,15 @@ import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.converter.JacksonJsonMessageConverter; +import org.springframework.kafka.support.converter.StringJacksonJsonMessageConverter; +import org.springframework.kafka.support.mapping.DefaultJacksonJavaTypeMapper; +import org.springframework.kafka.support.mapping.JacksonJavaTypeMapper; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; @Configuration @@ -18,7 +25,7 @@ public class ApplicationConfiguration public ExampleProducer exampleProducer( @Value("${spring.kafka.client-id}") String clientId, ApplicationProperties properties, - KafkaTemplate kafkaTemplate, + KafkaTemplate kafkaTemplate, ConfigurableApplicationContext applicationContext) { return @@ -30,5 +37,35 @@ public class ApplicationConfiguration : properties.getProducerProperties().getThrottle(), kafkaTemplate, () -> applicationContext.close()); + + } + + @Bean + public KafkaTemplate kafkaTemplate( + ProducerFactory producerFactory, + JacksonJsonMessageConverter jacksonJsonMessageConverter) { + + KafkaTemplate template = new KafkaTemplate<>(producerFactory); + template.setMessageConverter(jacksonJsonMessageConverter); + + return template; + } + + @Bean + public StringJacksonJsonMessageConverter jacksonJsonMessageConverter() + { + StringJacksonJsonMessageConverter converter = new StringJacksonJsonMessageConverter(); + DefaultJacksonJavaTypeMapper typeMapper = new DefaultJacksonJavaTypeMapper(); + + // Verwende eine einfache, kurze Type-ID anstatt FQN + typeMapper.setTypePrecedence(JacksonJavaTypeMapper.TypePrecedence.TYPE_ID); + Map> typeMappings = new HashMap<>(); + typeMappings.put("ADD", AddNumberMessage.class); + typeMappings.put("CALC", CalculateSumMessage.class); + typeMapper.setIdClassMapping(typeMappings); + + converter.setTypeMapper(typeMapper); + + return converter; } } diff --git a/src/main/java/de/juplo/kafka/CalculateSumMessage.java b/src/main/java/de/juplo/kafka/CalculateSumMessage.java new file mode 100644 index 00000000..6aa0121a --- /dev/null +++ b/src/main/java/de/juplo/kafka/CalculateSumMessage.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + + +import lombok.Value; + + +@Value +public class CalculateSumMessage implements SumupMessage +{ + private final int number; +} diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 1e0ec3e7..8f52ce4c 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -3,6 +3,9 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; import java.time.Duration; @@ -13,7 +16,7 @@ public class ExampleProducer implements Runnable private final String id; private final String topic; private final Duration throttle; - private final KafkaTemplate kafkaTemplate; + private final KafkaTemplate kafkaTemplate; private final Thread workerThread; private final Runnable closeCallback; @@ -25,7 +28,7 @@ public class ExampleProducer implements Runnable String id, String topic, Duration throttle, - KafkaTemplate kafkaTemplate, + KafkaTemplate kafkaTemplate, Runnable closeCallback) { this.id = id; @@ -49,7 +52,12 @@ public class ExampleProducer implements Runnable { for (; running; i++) { - send(Long.toString(i%10), Long.toString(i)); + int number = (int) i % 10; + SumupMessage message = (i % 7 == 0) + ? new CalculateSumMessage(number) + : new AddNumberMessage(number, (int)i); + + send(Long.toString(number), message); if (throttle.isPositive()) { @@ -76,11 +84,17 @@ public class ExampleProducer implements Runnable } } - void send(String key, String value) + void send(String key, SumupMessage value) { final long sendRequested = System.currentTimeMillis(); - kafkaTemplate.send(topic, key, value).whenComplete((result, e) -> + Message message = MessageBuilder + .withPayload(value) + .setHeader(KafkaHeaders.TOPIC, topic) + .setHeader(KafkaHeaders.KEY, key) + .build(); + + kafkaTemplate.send(message).whenComplete((result, e) -> { long sendRequestProcessed = System.currentTimeMillis(); if (e == null) diff --git a/src/main/java/de/juplo/kafka/SumupMessage.java b/src/main/java/de/juplo/kafka/SumupMessage.java new file mode 100644 index 00000000..739efd1e --- /dev/null +++ b/src/main/java/de/juplo/kafka/SumupMessage.java @@ -0,0 +1,5 @@ +package de.juplo.kafka; + +public interface SumupMessage +{ +}