]> juplo.de Git - demos/kafka/training/commitdiff
DLT für den auf Message-Conversion basierenden `@KafkaHandler`-Consumer konfiguriert
authorKai Moritz <kai@juplo.de>
Thu, 15 May 2025 21:36:38 +0000 (23:36 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 22 Mar 2026 20:51:54 +0000 (21:51 +0100)
README.sh
build.gradle
docker/docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/resources/application.yml

index bdeb8b22f112fefabffab199ff04688cf53427e8..c8d15314118a1fcb4b0fab99c3fefafcd17ae318 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/spring-consumer:1.1-messageconverter-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
index e441be1fc1d7e72c4086135abdcf1984241aa1b4..5eee5f64ff4b84fa20ad70219243f526e70fb7f2 100644 (file)
@@ -8,7 +8,7 @@ plugins {
 }
 
 group = 'de.juplo.kafka'
-version = '1.1-messageconverter-SNAPSHOT'
+version = '1.1-messageconverter-dlt-SNAPSHOT'
 
 java {
        toolchain {
index 0215380772209c229a1086bca704bd7b30aeeee5..b3d02c9445b0d493510c8b4dcf3427aa43e645ec 100644 (file)
@@ -129,6 +129,7 @@ services:
           echo -n Bereits konfiguriert:
           cat INITIALIZED
           kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+          kafka-topics --bootstrap-server kafka:9092 --describe --topic test-dlt
         else
           kafka-topics --bootstrap-server kafka:9092 \
                        --delete \
@@ -142,6 +143,18 @@ services:
                        --config min.insync.replicas=2 \
           && echo Das Topic \'test\' wurde erfolgreich angelegt: \
           && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \
+          && kafka-topics --bootstrap-server kafka:9092 \
+                       --delete \
+                       --if-exists \
+                       --topic test-dlt
+          kafka-topics --bootstrap-server kafka:9092 \
+                       --create \
+                       --topic test-dlt \
+                       --partitions 2 \
+                       --replication-factor 3 \
+                       --config min.insync.replicas=2 \
+          && echo Das Topic \'test-dlt\' wurde erfolgreich angelegt: \
+          && kafka-topics --bootstrap-server kafka:9092 --describe --topic test-dlt \
           && date > INITIALIZED
         fi
     stop_grace_period: 0s
@@ -183,7 +196,7 @@ services:
     mem_limit:  100m
 
   consumer:
-    image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT
+    image: juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT
     environment:
       spring.kafka.bootstrap-servers: kafka:9092
       spring.kafka.client-id: consumer
@@ -192,7 +205,7 @@ services:
       juplo.consumer.topic: test
 
   peter:
-    image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT
+    image: juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT
     environment:
       spring.kafka.bootstrap-servers: kafka:9092
       spring.kafka.client-id: peter
@@ -201,7 +214,7 @@ services:
       juplo.consumer.topic: test
 
   ute:
-    image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT
+    image: juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT
     environment:
       spring.kafka.bootstrap-servers: kafka:9092
       spring.kafka.client-id: ute
diff --git a/pom.xml b/pom.xml
index 8702d54dfa8d7cddbfa648e31eea64d53190fb0c..a7ed3423ae63c05b4dfecb6d9bbe46cf161cd526 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
   <artifactId>spring-consumer</artifactId>
   <name>Spring Consumer</name>
   <description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
-  <version>1.1-messageconverter-SNAPSHOT</version>
+  <version>1.1-messageconverter-dlt-SNAPSHOT</version>
 
   <properties>
     <java.version>21</java.version>
index 463621a8f90250de36aa1f7f74061261775e3d7f..ba86745a953abe7f727b219427addb66edf3b636 100644 (file)
@@ -3,6 +3,10 @@ package de.juplo.kafka;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.KafkaOperations;
+import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.kafka.listener.SeekUtils;
 import org.springframework.kafka.support.converter.JsonMessageConverter;
 import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
 import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
@@ -34,4 +38,18 @@ public class ApplicationConfiguration
 
     return converter;
   }
+
+  @Bean
+  public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaOperations<?, ?> kafkaTemplate)
+  {
+    return new DeadLetterPublishingRecoverer(kafkaTemplate);
+  }
+
+  @Bean
+  public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer)
+  {
+    return new DefaultErrorHandler(
+      recoverer,
+      SeekUtils.DEFAULT_BACK_OFF);
+  }
 }
index 8ca8a1bda6efeee778978d8b1a8c6a37a118b317..e4897bd5775d42d2aa7616ef1e669251c89f4abd 100644 (file)
@@ -28,6 +28,8 @@ spring:
     consumer:
       group-id: my-group
       value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
+    producer:
+      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
   level:
     root: INFO
     de.juplo: DEBUG