From: Kai Moritz Date: Thu, 15 May 2025 21:36:38 +0000 (+0200) Subject: DLT für den auf Message-Conversion basierenden `@KafkaHandler`-Consumer konfiguriert X-Git-Tag: spring/spring-consumer--messageconverter--dlt--2025-05-lvm--spickzettel~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=8d75ae87846b9ac54741648ed90ede4f220edf2a;p=demos%2Fkafka%2Ftraining DLT für den auf Message-Conversion basierenden `@KafkaHandler`-Consumer konfiguriert --- diff --git a/README.sh b/README.sh index bdeb8b2..c8d1531 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-messageconverter-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index 7968723..d8a228f 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.1-messageconverter-SNAPSHOT' +version = '1.1-messageconverter-dlt-SNAPSHOT' java { toolchain { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 8c411dd..64976c2 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -95,6 +95,7 @@ services: echo -n Bereits konfiguriert: cat INITIALIZED kafka-topics --bootstrap-server kafka:9092 --describe --topic test + kafka-topics --bootstrap-server kafka:9092 --describe --topic test-dlt else kafka-topics --bootstrap-server kafka:9092 \ --delete \ @@ -108,6 +109,18 @@ services: --config min.insync.replicas=2 \ && echo Das Topic \'test\' wurde erfolgreich angelegt: \ && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \ + && kafka-topics --bootstrap-server kafka:9092 \ + --delete \ + --if-exists \ + --topic test-dlt + kafka-topics --bootstrap-server kafka:9092 \ + --create \ + --topic test-dlt \ + --partitions 2 \ + --replication-factor 3 \ + --config min.insync.replicas=2 \ + && echo Das Topic \'test-dlt\' wurde erfolgreich angelegt: \ + && kafka-topics --bootstrap-server kafka:9092 --describe --topic test-dlt \ && date > INITIALIZED fi stop_grace_period: 0s @@ -143,7 +156,7 @@ services: juplo.producer.topic: test consumer: - image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT + image: juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: consumer @@ -152,7 +165,7 @@ services: juplo.consumer.topic: test peter: - image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT + image: juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: peter @@ -161,7 +174,7 @@ services: juplo.consumer.topic: test ute: - image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT + image: juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: ute diff --git a/pom.xml b/pom.xml index 2b90634..f06056c 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-messageconverter-SNAPSHOT + 1.1-messageconverter-dlt-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 463621a..ba86745 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -3,6 +3,10 @@ package de.juplo.kafka; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.listener.SeekUtils; import org.springframework.kafka.support.converter.JsonMessageConverter; import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper; import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper; @@ -34,4 +38,18 @@ public class ApplicationConfiguration return converter; } + + @Bean + public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaOperations kafkaTemplate) + { + return new DeadLetterPublishingRecoverer(kafkaTemplate); + } + + @Bean + public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer) + { + return new DefaultErrorHandler( + recoverer, + SeekUtils.DEFAULT_BACK_OFF); + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 8ca8a1b..e4897bd 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -28,6 +28,8 @@ spring: consumer: group-id: my-group value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer + producer: + value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer level: root: INFO de.juplo: DEBUG