From 2eb00a6cce8be4588119915ed9623fbd1bab39d7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 20 Nov 2022 16:26:06 +0100 Subject: [PATCH] =?utf8?q?DLT-Konfig=20f=C3=BCr=20`spring-consumer`,=20die?= =?utf8?q?=20auch=20mit=20Poison=20Pills=20umgehen=20kann?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Damit der Producer, der die Nachrichten in das Dead-Letter-Topic schreibt, sowohl mit Deserialisierten Nachriten umgehen kann, die über den JsonSerializer zu serialisieren sind, als auch mit Poison Pills, die unverändert als `byte[]` zu schreiben sind, muss ein `DelegatingByTypeSerializer` konfiguriert werden. * Dieser erwartet das Mapping im Konstruktor als Map und kann daher nicht (oder nicht so leicht?) über die `application.yml` konfiguriert werden! --- src/main/java/de/juplo/kafka/Application.java | 33 +++++++++++++++++++ src/main/resources/application.yml | 3 -- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index f26d7a2..0360f68 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,20 +1,53 @@ package de.juplo.kafka; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer; +import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.util.backoff.FixedBackOff; +import java.util.Map; + @SpringBootApplication @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) public class Application { + @Bean + public ProducerFactory producerFactory( + KafkaProperties properties) + { + Map map = Map.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); + return new DefaultKafkaProducerFactory<>( + map, + new StringSerializer(), + new DelegatingByTypeSerializer( + Map.of( + byte[].class, new ByteArraySerializer(), + MessageAddNumber.class, new JsonSerializer<>(), + MessageCalculateSum.class, new JsonSerializer<>()))); + } + + @Bean + public KafkaTemplate kafkaTemplate( + ProducerFactory producerFactory) + { + return new KafkaTemplate<>(producerFactory); + } + @Bean public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer( KafkaOperations kafkaTemplate) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 61a3f85..17e94ad 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -38,9 +38,6 @@ spring: spring.json.type.mapping: > ADD:de.juplo.kafka.MessageAddNumber, CALC:de.juplo.kafka.MessageCalculateSum - producer: - key-serializer: org.apache.kafka.common.serialization.StringSerializer - value-serializer: org.springframework.kafka.support.serializer.JsonSerializer logging: level: root: INFO -- 2.20.1