DLT-Konfig für `spring-consumer`, die auch mit Poison Pills umgehen kann spring/spring-consumer--kafkalistener--long--dlt spring/spring-consumer--kafkalistener--long--dlt--2025-05-lvm--spickzettel spring/spring-consumer--kafkalistener--long--dlt--2025-05-signal-spickzettel
authorKai Moritz <kai@juplo.de>
Sun, 20 Nov 2022 15:26:06 +0000 (16:26 +0100)
committerKai Moritz <kai@juplo.de>
Thu, 15 May 2025 20:57:12 +0000 (22:57 +0200)
* Damit der Producer, der die Nachrichten in das Dead-Letter-Topic schreibt,
  sowohl mit Deserialisierten Nachriten umgehen kann, die über den
  LongSerializer zu serialisieren sind, als auch mit Poison Pills, die
  unverändert als `byte[]` zu schreiben sind, muss ein
  `DelegatingByTypeSerializer` konfiguriert werden.
* Dieser erwartet das Mapping im Konstruktor als Map und kann daher nicht
  über die `application.yml` konfiguriert werden!

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/resources/application.yml

index 0f18338..a9cd82a 100644 (file)
@@ -1,16 +1,43 @@
 package de.juplo.kafka;
 
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
 import org.springframework.kafka.core.KafkaOperations;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
 import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
 import org.springframework.kafka.listener.DefaultErrorHandler;
 import org.springframework.kafka.listener.SeekUtils;
+import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
+
+import java.util.Map;
 
 
 @Configuration
 public class ApplicationConfiguration
 {
+  @Bean
+  public ProducerFactory<String, Object> producerFactory(KafkaProperties properties)
+  {
+    return new DefaultKafkaProducerFactory<>(
+      properties.buildProducerProperties(),
+      new StringSerializer(),
+      new DelegatingByTypeSerializer(Map.of(
+        Long.class, new LongSerializer(),
+        byte[].class, new ByteArraySerializer())));
+  }
+
+  @Bean
+  public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory)
+  {
+    return new KafkaTemplate<>(producerFactory);
+  }
+
   @Bean
   public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaOperations<?, ?> kafkaTemplate)
   {
index 4a8efd3..e248b95 100644 (file)
@@ -30,8 +30,6 @@ spring:
       group-id: my-group
       properties:
         "[spring.deserializer.value.delegate.class]": org.apache.kafka.common.serialization.LongDeserializer
-    producer:
-      value-serializer: org.apache.kafka.common.serialization.LongSerializer
 logging:
   level:
     root: INFO