From e0bf8dd79190b97f8a7dde3dbe33eb3ddcbe9235 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 14 Feb 2025 14:29:40 +0100 Subject: [PATCH] =?utf8?q?Versand=20=C3=BCber=20das=20`KafkaTemplate`=20mi?= =?utf8?q?t=20dem=20`MessageCovnerter`?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Verwendung des `ByteArraySerializer` * Type-Mappings für den `StringJsonMessageConverter` konfiguriert --- README.sh | 2 +- build.gradle | 4 +- docker/docker-compose.yml | 2 +- pom.xml | 4 +- .../juplo/kafka/ApplicationConfiguration.java | 45 ++++++++++++++++--- .../java/de/juplo/kafka/ExampleProducer.java | 27 +++++------ src/main/resources/application.yml | 14 +----- .../java/de/juplo/kafka/ApplicationTests.java | 1 - 8 files changed, 60 insertions(+), 39 deletions(-) diff --git a/README.sh b/README.sh index 982f7bd3..c208c12f 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-producer:1.0-json-SNAPSHOT +IMAGE=juplo/spring-producer:1.0-messageconverter-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index 7556511b..2d30d50d 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.0-json-SNAPSHOT' +version = '1.0-messageconverter-SNAPSHOT' java { toolchain { @@ -27,10 +27,10 @@ repositories { } dependencies { + implementation 'org.springframework.kafka:spring-kafka' implementation 'org.springframework.boot:spring-boot-starter-actuator' implementation 'org.springframework.boot:spring-boot-starter-validation' implementation 'org.springframework.boot:spring-boot-starter-web' - implementation 'org.springframework.kafka:spring-kafka' compileOnly 'org.projectlombok:lombok' developmentOnly 'org.springframework.boot:spring-boot-devtools' annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor' diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index ac00f3c6..fe5faf26 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -136,7 +136,7 @@ services: - kafka-3 producer: - image: juplo/spring-producer:1.0-json-SNAPSHOT + image: juplo/spring-producer:1.0-messageconverter-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: producer diff --git a/pom.xml b/pom.xml index 7bcb24cd..9c934922 100644 --- a/pom.xml +++ b/pom.xml @@ -14,8 +14,8 @@ de.juplo.kafka spring-producer Spring Producer - A Simple Producer, based on Spring Boot, that sends messages via Kafka - 1.0-json-SNAPSHOT + A Simple Producer, based on the KafkaTemplate and Spring Boot, that sends messages via Kafka + 1.0-messageconverter-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index e212a253..57fa246f 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,14 +1,22 @@ package de.juplo.kafka; -import org.apache.kafka.clients.producer.Producer; +import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter; +import org.springframework.kafka.support.converter.JsonMessageConverter; +import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper; +import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; @Configuration @@ -19,7 +27,8 @@ public class ApplicationConfiguration public ExampleProducer exampleProducer( @Value("${spring.kafka.client-id}") String clientId, ApplicationProperties properties, - Producer kafkaProducer, + KafkaProperties kafkaProperties, + KafkaTemplate kafkaTemplate, ConfigurableApplicationContext applicationContext) { return @@ -29,13 +38,37 @@ public class ApplicationConfiguration properties.getProducerProperties().getThrottle() == null ? Duration.ofMillis(500) : properties.getProducerProperties().getThrottle(), - kafkaProducer, + kafkaTemplate, () -> applicationContext.close()); + + } + + @Bean + public KafkaTemplate kafkaTemplate( + ProducerFactory producerFactory, + JsonMessageConverter jsonMessageConverter) { + + KafkaTemplate template = new KafkaTemplate<>(producerFactory); + template.setMessageConverter(jsonMessageConverter); + + return template; } - @Bean(destroyMethod = "") - public Producer kafkaProducer(ProducerFactory producerFactory) + @Bean + public ByteArrayJsonMessageConverter jsonMessageConverter(ObjectMapper objectMapper) { - return producerFactory.createProducer(); + ByteArrayJsonMessageConverter converter = new ByteArrayJsonMessageConverter(); + DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper(); + + // Verwende eine einfache, kurze Type-ID anstatt FQN + typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID); + Map> typeMappings = new HashMap<>(); + typeMappings.put("ADD", AddNumberMessage.class); + typeMappings.put("CALC", CalculateSumMessage.class); + typeMapper.setIdClassMapping(typeMappings); + + converter.setTypeMapper(typeMapper); + + return converter; } } diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 0d152f77..9a262b1c 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -1,8 +1,11 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; import java.time.Duration; @@ -13,7 +16,7 @@ public class ExampleProducer implements Runnable private final String id; private final String topic; private final Duration throttle; - private final Producer producer; + private final KafkaTemplate kafkaTemplate; private final Thread workerThread; private final Runnable closeCallback; @@ -25,13 +28,13 @@ public class ExampleProducer implements Runnable String id, String topic, Duration throttle, - Producer producer, + KafkaTemplate kafkaTemplate, Runnable closeCallback) { this.id = id; this.topic = topic; this.throttle = throttle; - this.producer = producer; + this.kafkaTemplate = kafkaTemplate; workerThread = new Thread(this, "ExampleProducer Worker-Thread"); workerThread.start(); @@ -77,8 +80,6 @@ public class ExampleProducer implements Runnable } finally { - log.info("{}: Closing the KafkaProducer", id); - producer.close(); log.info("{}: Produced {} messages in total, exiting!", id, produced); } } @@ -87,18 +88,18 @@ public class ExampleProducer implements Runnable { final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); + Message message = MessageBuilder + .withPayload(value) + .setHeader(KafkaHeaders.TOPIC, topic) + .build(); - producer.send(record, (metadata, e) -> + kafkaTemplate.send(message).whenComplete((result, e) -> { long now = System.currentTimeMillis(); if (e == null) { // HANDLE SUCCESS + RecordMetadata metadata = result.getRecordMetadata(); log.debug( "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms", id, diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index daf440e9..5c711809 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -11,11 +11,8 @@ spring: buffer-memory: 33554432 batch-size: 16384 compression-type: gzip - value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer properties: - spring.json.type.mapping: >- - ADD:de.juplo.kafka.AddNumberMessage, - CALC:de.juplo.kafka.CalculateSumMessage delivery-timeout: 10s max-block: 5s linger: 0 @@ -34,17 +31,8 @@ management: enabled: true info: kafka: - bootstrap-server: ${juplo.bootstrap-server} - client-id: ${juplo.client-id} producer: topic: ${juplo.producer.topic} - acks: ${juplo.producer.acks} - delivery-timeout: ${juplo.producer.delivery-timeout} - max-block: ${juplo.producer.max-block} - buffer-memory: ${juplo.producer.buffer-memory} - batch-size: ${juplo.producer.batch-size} - linger: ${juplo.producer.linger} - compression-type: ${juplo.producer.compression-type} throttle: ${juplo.producer.throttle} logging: level: diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 3a4493ec..7687e9ca 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -27,7 +27,6 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "spring.kafka.consumer.auto-offset-reset=earliest", "juplo.producer.topic=" + TOPIC}) @AutoConfigureMockMvc -- 2.20.1