From f7d08b40e08ec4fb99c81c45224e081c7279ead4 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 20 Nov 2022 16:26:06 +0100 Subject: [PATCH] =?utf8?q?DLT-Konfig=20f=C3=BCr=20`spring-consumer`,=20die?= =?utf8?q?=20auch=20mit=20Poison=20Pills=20umgehen=20kann?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Damit der Producer, der die Nachrichten in das Dead-Letter-Topic schreibt, sowohl mit Deserialisierten Nachriten umgehen kann, die über den LongSerializer zu serialisieren sind, als auch mit Poison Pills, die unverändert als `byte[]` zu schreiben sind, muss ein `DelegatingByTypeSerializer` konfiguriert werden. * Dieser erwartet das Mapping im Konstruktor als Map und kann daher nicht über die `application.yml` konfiguriert werden! --- .../juplo/kafka/ApplicationConfiguration.java | 27 +++++++++++++++++++ src/main/resources/application.yml | 2 -- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 0f18338..a9cd82a 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,16 +1,43 @@ package de.juplo.kafka; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.listener.SeekUtils; +import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer; + +import java.util.Map; @Configuration public class ApplicationConfiguration { + @Bean + public ProducerFactory producerFactory(KafkaProperties properties) + { + return new DefaultKafkaProducerFactory<>( + properties.buildProducerProperties(), + new StringSerializer(), + new DelegatingByTypeSerializer(Map.of( + Long.class, new LongSerializer(), + byte[].class, new ByteArraySerializer()))); + } + + @Bean + public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) + { + return new KafkaTemplate<>(producerFactory); + } + @Bean public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaOperations kafkaTemplate) { diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 4a8efd3..e248b95 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -30,8 +30,6 @@ spring: group-id: my-group properties: "[spring.deserializer.value.delegate.class]": org.apache.kafka.common.serialization.LongDeserializer - producer: - value-serializer: org.apache.kafka.common.serialization.LongSerializer logging: level: root: INFO -- 2.20.1