From cffe1105a3d11db9a50815d2e31b9ed1cd2eb1b1 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 14 Feb 2025 14:29:40 +0100 Subject: [PATCH] =?utf8?q?Versand=20=C3=BCber=20das=20`KafkaTemplate`=20mi?= =?utf8?q?t=20dem=20`MessageCovnerter`?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Verwendung des `ByteArraySerializer` * Type-Mappings für den `StringJsonMessageConverter` konfiguriert --- README.sh | 2 +- build.gradle | 4 +- docker/docker-compose.yml | 6 +- pom.xml | 4 +- .../juplo/kafka/ApplicationConfiguration.java | 68 +++++++++++-------- .../de/juplo/kafka/ApplicationProperties.java | 23 ------- .../java/de/juplo/kafka/ExampleProducer.java | 27 ++++---- src/main/resources/application.yml | 32 ++++----- .../java/de/juplo/kafka/ApplicationTests.java | 2 - 9 files changed, 77 insertions(+), 91 deletions(-) diff --git a/README.sh b/README.sh index 982f7bd3..c208c12f 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-producer:1.0-json-SNAPSHOT +IMAGE=juplo/spring-producer:1.0-messageconverter-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index 7556511b..2d30d50d 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.0-json-SNAPSHOT' +version = '1.0-messageconverter-SNAPSHOT' java { toolchain { @@ -27,10 +27,10 @@ repositories { } dependencies { + implementation 'org.springframework.kafka:spring-kafka' implementation 'org.springframework.boot:spring-boot-starter-actuator' implementation 'org.springframework.boot:spring-boot-starter-validation' implementation 'org.springframework.boot:spring-boot-starter-web' - implementation 'org.springframework.kafka:spring-kafka' compileOnly 'org.projectlombok:lombok' developmentOnly 'org.springframework.boot:spring-boot-devtools' annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor' diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 2e623598..fe5faf26 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -136,10 +136,10 @@ services: - kafka-3 producer: - image: juplo/spring-producer:1.0-json-SNAPSHOT + image: juplo/spring-producer:1.0-messageconverter-SNAPSHOT environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: producer + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: producer juplo.producer.topic: test consumer: diff --git a/pom.xml b/pom.xml index 7bcb24cd..9c934922 100644 --- a/pom.xml +++ b/pom.xml @@ -14,8 +14,8 @@ de.juplo.kafka spring-producer Spring Producer - A Simple Producer, based on Spring Boot, that sends messages via Kafka - 1.0-json-SNAPSHOT + A Simple Producer, based on the KafkaTemplate and Spring Boot, that sends messages via Kafka + 1.0-messageconverter-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index d806efac..02db4842 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,16 +1,21 @@ package de.juplo.kafka; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.serialization.StringSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter; +import org.springframework.kafka.support.converter.JsonMessageConverter; +import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper; +import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper; import java.time.Duration; -import java.util.Properties; +import java.util.HashMap; +import java.util.Map; @Configuration @@ -20,39 +25,48 @@ public class ApplicationConfiguration @Bean public ExampleProducer exampleProducer( ApplicationProperties properties, - Producer kafkaProducer, + KafkaProperties kafkaProperties, + KafkaTemplate kafkaTemplate, ConfigurableApplicationContext applicationContext) { return new ExampleProducer( - properties.getClientId(), + kafkaProperties.getClientId(), properties.getProducerProperties().getTopic(), properties.getProducerProperties().getThrottle() == null ? Duration.ofMillis(500) : properties.getProducerProperties().getThrottle(), - kafkaProducer, + kafkaTemplate, () -> applicationContext.close()); + + } + + @Bean + public KafkaTemplate kafkaTemplate( + ProducerFactory producerFactory, + JsonMessageConverter jsonMessageConverter) { + + KafkaTemplate template = new KafkaTemplate<>(producerFactory); + template.setMessageConverter(jsonMessageConverter); + + return template; } - @Bean(destroyMethod = "") - public KafkaProducer kafkaProducer(ApplicationProperties properties) + @Bean + public ByteArrayJsonMessageConverter jsonMessageConverter(ObjectMapper objectMapper) { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("acks", properties.getProducerProperties().getAcks()); - props.put("delivery.timeout.ms", (int)properties.getProducerProperties().getDeliveryTimeout().toMillis()); - props.put("max.block.ms", (int)properties.getProducerProperties().getMaxBlock().toMillis()); - props.put("buffer.memory", properties.getProducerProperties().getBufferMemory()); - props.put("batch.size", properties.getProducerProperties().getBatchSize()); - props.put("metadata.max.age.ms", 5000); // 5 Sekunden - props.put("request.timeout.ms", 5000); // 5 Sekunden - props.put("linger.ms", properties.getProducerProperties().getLinger().toMillis()); - props.put("compression.type", properties.getProducerProperties().getCompressionType()); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", JsonSerializer.class.getName()); - props.put("spring.json.type.mapping", "ADD:de.juplo.kafka.AddNumberMessage,CALC:de.juplo.kafka.CalculateSumMessage"); - - return new KafkaProducer<>(props); + ByteArrayJsonMessageConverter converter = new ByteArrayJsonMessageConverter(); + DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper(); + + // Verwende eine einfache, kurze Type-ID anstatt FQN + typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID); + Map> typeMappings = new HashMap<>(); + typeMappings.put("ADD", AddNumberMessage.class); + typeMappings.put("CALC", CalculateSumMessage.class); + typeMapper.setIdClassMapping(typeMappings); + + converter.setTypeMapper(typeMapper); + + return converter; } } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 43232628..908072cb 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -16,13 +16,6 @@ import java.time.Duration; @Setter public class ApplicationProperties { - @NotNull - @NotEmpty - private String bootstrapServer; - @NotNull - @NotEmpty - private String clientId; - @NotNull private ProducerProperties producer; @@ -41,22 +34,6 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; - @NotNull - @NotEmpty - private String acks; - @NotNull - private Duration deliveryTimeout; - @NotNull - private Duration maxBlock; - @NotNull - private Long bufferMemory; - @NotNull - private Integer batchSize; - @NotNull - private Duration linger; - @NotNull - @NotEmpty - private String compressionType; private Duration throttle; } } diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 1a06aa86..f3e1e490 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -1,8 +1,11 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; import java.time.Duration; @@ -13,7 +16,7 @@ public class ExampleProducer implements Runnable private final String id; private final String topic; private final Duration throttle; - private final Producer producer; + private final KafkaTemplate kafkaTemplate; private final Thread workerThread; private final Runnable closeCallback; @@ -25,13 +28,13 @@ public class ExampleProducer implements Runnable String id, String topic, Duration throttle, - Producer producer, + KafkaTemplate kafkaTemplate, Runnable closeCallback) { this.id = id; this.topic = topic; this.throttle = throttle; - this.producer = producer; + this.kafkaTemplate = kafkaTemplate; workerThread = new Thread(this, "ExampleProducer Worker-Thread"); workerThread.start(); @@ -77,8 +80,6 @@ public class ExampleProducer implements Runnable } finally { - log.info("{}: Closing the KafkaProducer", id); - producer.close(); log.info("{}: Produced {} messages in total, exiting!", id, produced); } } @@ -87,18 +88,18 @@ public class ExampleProducer implements Runnable { final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); + Message message = MessageBuilder + .withPayload(value) + .setHeader(KafkaHeaders.TOPIC, topic) + .build(); - producer.send(record, (metadata, e) -> + kafkaTemplate.send(message).whenComplete((result, e) -> { long now = System.currentTimeMillis(); if (e == null) { // HANDLE SUCCESS + RecordMetadata metadata = result.getRecordMetadata(); produced++; log.debug( "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms", diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 98ea1284..5c711809 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,16 +1,21 @@ juplo: - bootstrap-server: :9092 - client-id: DEV producer: topic: test - acks: -1 - delivery-timeout: 10s - max-block: 5s - buffer-memory: 33554432 - batch-size: 16384 - linger: 0 - compression-type: gzip throttle: 500 +spring: + kafka: + bootstrap-servers: :9092 + client-id: DEV + producer: + acks: -1 + buffer-memory: 33554432 + batch-size: 16384 + compression-type: gzip + value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer + properties: + delivery-timeout: 10s + max-block: 5s + linger: 0 management: endpoint: shutdown: @@ -26,17 +31,8 @@ management: enabled: true info: kafka: - bootstrap-server: ${juplo.bootstrap-server} - client-id: ${juplo.client-id} producer: topic: ${juplo.producer.topic} - acks: ${juplo.producer.acks} - delivery-timeout: ${juplo.producer.delivery-timeout} - max-block: ${juplo.producer.max-block} - buffer-memory: ${juplo.producer.buffer-memory} - batch-size: ${juplo.producer.batch-size} - linger: ${juplo.producer.linger} - compression-type: ${juplo.producer.compression-type} throttle: ${juplo.producer.throttle} logging: level: diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 29ca80a7..7687e9ca 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -27,9 +27,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "spring.kafka.consumer.auto-offset-reset=earliest", - "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", "juplo.producer.topic=" + TOPIC}) @AutoConfigureMockMvc @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) -- 2.20.1