From e0ee20b45671ea6ad106f3bc9c36e1ae9871caac Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 15 May 2025 23:36:38 +0200 Subject: [PATCH] =?utf8?q?DLT=20f=C3=BCr=20den=20auf=20Message-Conversion?= =?utf8?q?=20basierenden=20`@KafkaHandler`-Consumer=20konfiguriert?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 2 +- build.gradle | 2 +- docker/docker-compose.yml | 19 ++++++++++++++++--- pom.xml | 2 +- .../juplo/kafka/ApplicationConfiguration.java | 18 ++++++++++++++++++ src/main/resources/application.yml | 2 ++ 6 files changed, 39 insertions(+), 6 deletions(-) diff --git a/README.sh b/README.sh index bdeb8b2..c8d1531 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-messageconverter-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index 7968723..d8a228f 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.1-messageconverter-SNAPSHOT' +version = '1.1-messageconverter-dlt-SNAPSHOT' java { toolchain { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 8c411dd..64976c2 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -95,6 +95,7 @@ services: echo -n Bereits konfiguriert: cat INITIALIZED kafka-topics --bootstrap-server kafka:9092 --describe --topic test + kafka-topics --bootstrap-server kafka:9092 --describe --topic test-dlt else kafka-topics --bootstrap-server kafka:9092 \ --delete \ @@ -108,6 +109,18 @@ services: --config min.insync.replicas=2 \ && echo Das Topic \'test\' wurde erfolgreich angelegt: \ && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \ + && kafka-topics --bootstrap-server kafka:9092 \ + --delete \ + --if-exists \ + --topic test-dlt + kafka-topics --bootstrap-server kafka:9092 \ + --create \ + --topic test-dlt \ + --partitions 2 \ + --replication-factor 3 \ + --config min.insync.replicas=2 \ + && echo Das Topic \'test-dlt\' wurde erfolgreich angelegt: \ + && kafka-topics --bootstrap-server kafka:9092 --describe --topic test-dlt \ && date > INITIALIZED fi stop_grace_period: 0s @@ -143,7 +156,7 @@ services: juplo.producer.topic: test consumer: - image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT + image: juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: consumer @@ -152,7 +165,7 @@ services: juplo.consumer.topic: test peter: - image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT + image: juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: peter @@ -161,7 +174,7 @@ services: juplo.consumer.topic: test ute: - image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT + image: juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: ute diff --git a/pom.xml b/pom.xml index 2b90634..f06056c 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-messageconverter-SNAPSHOT + 1.1-messageconverter-dlt-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 463621a..ba86745 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -3,6 +3,10 @@ package de.juplo.kafka; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.listener.SeekUtils; import org.springframework.kafka.support.converter.JsonMessageConverter; import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper; import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper; @@ -34,4 +38,18 @@ public class ApplicationConfiguration return converter; } + + @Bean + public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaOperations kafkaTemplate) + { + return new DeadLetterPublishingRecoverer(kafkaTemplate); + } + + @Bean + public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer) + { + return new DefaultErrorHandler( + recoverer, + SeekUtils.DEFAULT_BACK_OFF); + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 8ca8a1b..e4897bd 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -28,6 +28,8 @@ spring: consumer: group-id: my-group value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer + producer: + value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer level: root: INFO de.juplo: DEBUG -- 2.20.1