From e829a33be8a6eaf555a75ede6dc125750fa4032d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 20 Nov 2022 15:40:35 +0100 Subject: [PATCH] Version des `spring-consumer` mit einem Dead-Letter-Topic MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Diese Version ist kann fachliche Fehler umleiten. * Für Deserialisierungs-Fehler funktioniert die Umleitung noch nicht! --- README.sh | 2 +- build.gradle | 2 +- docker/docker-compose.yml | 19 ++++++++++--- pom.xml | 2 +- src/main/java/de/juplo/kafka/Application.java | 3 +++ .../juplo/kafka/ApplicationConfiguration.java | 27 +++++++++++++++++++ .../java/de/juplo/kafka/ExampleConsumer.java | 1 + src/main/resources/application.yml | 2 ++ .../java/de/juplo/kafka/ApplicationTests.java | 4 ++- 9 files changed, 55 insertions(+), 7 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/ApplicationConfiguration.java diff --git a/README.sh b/README.sh index ba1f88e..8d02879 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-kafkalistener-long-deserialization-error-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-kafkalistener-long-dlt-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index 0011a97..88ad243 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.1-kafkalistener-long-deserialization-error-SNAPSHOT' +version = '1.1-kafkalistener-long-dlt-SNAPSHOT' java { toolchain { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 58da0d0..f3d8a29 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -95,6 +95,7 @@ services: echo -n Bereits konfiguriert: cat INITIALIZED kafka-topics --bootstrap-server kafka:9092 --describe --topic test + kafka-topics --bootstrap-server kafka:9092 --describe --topic test-dlt else kafka-topics --bootstrap-server kafka:9092 \ --delete \ @@ -108,6 +109,18 @@ services: --config min.insync.replicas=2 \ && echo Das Topic \'test\' wurde erfolgreich angelegt: \ && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \ + && kafka-topics --bootstrap-server kafka:9092 \ + --delete \ + --if-exists \ + --topic test-dlt + kafka-topics --bootstrap-server kafka:9092 \ + --create \ + --topic test-dlt \ + --partitions 2 \ + --replication-factor 3 \ + --config min.insync.replicas=2 \ + && echo Das Topic \'test-dlt\' wurde erfolgreich angelegt: \ + && kafka-topics --bootstrap-server kafka:9092 --describe --topic test-dlt \ && date > INITIALIZED fi stop_grace_period: 0s @@ -143,7 +156,7 @@ services: juplo.producer.topic: test consumer: - image: juplo/spring-consumer:1.1-kafkalistener-long-deserialization-error-SNAPSHOT + image: juplo/spring-consumer:1.1-kafkalistener-long-dlt-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: consumer @@ -152,7 +165,7 @@ services: juplo.consumer.topic: test peter: - image: juplo/spring-consumer:1.1-kafkalistener-long-deserialization-error-SNAPSHOT + image: juplo/spring-consumer:1.1-kafkalistener-long-dlt-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: peter @@ -161,7 +174,7 @@ services: juplo.consumer.topic: test ute: - image: juplo/spring-consumer:1.1-kafkalistener-long-deserialization-error-SNAPSHOT + image: juplo/spring-consumer:1.1-kafkalistener-long-dlt-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: ute diff --git a/pom.xml b/pom.xml index 24caf6a..503c165 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-kafkalistener-long-deserialization-error-SNAPSHOT + 1.1-kafkalistener-long-dlt-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 0069257..560de60 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -2,9 +2,12 @@ package de.juplo.kafka; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; @SpringBootApplication +@EnableConfigurationProperties(KafkaProperties.class) public class Application { public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java new file mode 100644 index 0000000..0f18338 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -0,0 +1,27 @@ +package de.juplo.kafka; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.listener.SeekUtils; + + +@Configuration +public class ApplicationConfiguration +{ + @Bean + public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaOperations kafkaTemplate) + { + return new DeadLetterPublishingRecoverer(kafkaTemplate); + } + + @Bean + public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer) + { + return new DefaultErrorHandler( + recoverer, + SeekUtils.DEFAULT_BACK_OFF); + } +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 6e2b760..30dc025 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -41,6 +41,7 @@ public class ExampleConsumer String key, Long value) { + if (value % 66 == 0) throw new RuntimeException("BOOM: Fachlicher Fehler!"); consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index e248b95..4a8efd3 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -30,6 +30,8 @@ spring: group-id: my-group properties: "[spring.deserializer.value.delegate.class]": org.apache.kafka.common.serialization.LongDeserializer + producer: + value-serializer: org.apache.kafka.common.serialization.LongSerializer logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index ae119bf..6e9f484 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -10,6 +10,7 @@ import org.springframework.test.web.servlet.MockMvc; import java.time.Duration; import static de.juplo.kafka.ApplicationTests.PARTITIONS; +import static de.juplo.kafka.ApplicationTests.DLT; import static de.juplo.kafka.ApplicationTests.TOPIC; import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; @@ -22,10 +23,11 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", "juplo.consumer.topic=" + TOPIC }) @AutoConfigureMockMvc -@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) +@EmbeddedKafka(topics = { TOPIC, DLT }, partitions = PARTITIONS) public class ApplicationTests { static final String TOPIC = "FOO"; + public static final String DLT = TOPIC + "-dlt"; static final int PARTITIONS = 10; @Autowired -- 2.20.1