From: Kai Moritz Date: Thu, 15 May 2025 21:36:38 +0000 (+0200) Subject: DLT für den auf Message-Conversion basierenden `@KafkaHandler`-Consumer konfiguriert X-Git-Tag: springkafka/spring-consumer--messageconverter--dlt--2026-03-lvm~1 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=385260561ad2cab4654fc1c2618fa8d4364881de;p=demos%2Fkafka%2Ftraining DLT für den auf Message-Conversion basierenden `@KafkaHandler`-Consumer konfiguriert --- diff --git a/README.sh b/README.sh index bdeb8b22..c8d15314 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-messageconverter-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index e441be1f..5eee5f64 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.1-messageconverter-SNAPSHOT' +version = '1.1-messageconverter-dlt-SNAPSHOT' java { toolchain { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 02153807..b3d02c94 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -129,6 +129,7 @@ services: echo -n Bereits konfiguriert: cat INITIALIZED kafka-topics --bootstrap-server kafka:9092 --describe --topic test + kafka-topics --bootstrap-server kafka:9092 --describe --topic test-dlt else kafka-topics --bootstrap-server kafka:9092 \ --delete \ @@ -142,6 +143,18 @@ services: --config min.insync.replicas=2 \ && echo Das Topic \'test\' wurde erfolgreich angelegt: \ && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \ + && kafka-topics --bootstrap-server kafka:9092 \ + --delete \ + --if-exists \ + --topic test-dlt + kafka-topics --bootstrap-server kafka:9092 \ + --create \ + --topic test-dlt \ + --partitions 2 \ + --replication-factor 3 \ + --config min.insync.replicas=2 \ + && echo Das Topic \'test-dlt\' wurde erfolgreich angelegt: \ + && kafka-topics --bootstrap-server kafka:9092 --describe --topic test-dlt \ && date > INITIALIZED fi stop_grace_period: 0s @@ -183,7 +196,7 @@ services: mem_limit: 100m consumer: - image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT + image: juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: consumer @@ -192,7 +205,7 @@ services: juplo.consumer.topic: test peter: - image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT + image: juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: peter @@ -201,7 +214,7 @@ services: juplo.consumer.topic: test ute: - image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT + image: juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: ute diff --git a/pom.xml b/pom.xml index da1b7055..5f6e59fa 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-messageconverter-SNAPSHOT + 1.1-messageconverter-dlt-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 9cae26d8..b81f25c3 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -5,6 +5,10 @@ import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.converter.JacksonJsonMessageConverter; import org.springframework.kafka.support.mapping.DefaultJacksonJavaTypeMapper; import org.springframework.kafka.support.mapping.JacksonJavaTypeMapper; +import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.listener.SeekUtils; import java.util.HashMap; import java.util.Map; @@ -33,4 +37,18 @@ public class ApplicationConfiguration return converter; } + + @Bean + public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaOperations kafkaTemplate) + { + return new DeadLetterPublishingRecoverer(kafkaTemplate); + } + + @Bean + public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer) + { + return new DefaultErrorHandler( + recoverer, + SeekUtils.DEFAULT_BACK_OFF); + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 8ca8a1bd..e4897bd5 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -28,6 +28,8 @@ spring: consumer: group-id: my-group value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer + producer: + value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer level: root: INFO de.juplo: DEBUG