From: Kai Moritz Date: Sun, 20 Nov 2022 14:40:35 +0000 (+0100) Subject: Version des `spring-consumer` mit einem Dead-Letter-Topic X-Git-Tag: springkafka/spring-consumer--kafkalistener--long--dlt--2026-03-21--smartlifecycle-only~1 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=2384bf37e58a88c2c2b0e5c070e11faaa9128275;p=demos%2Fkafka%2Ftraining Version des `spring-consumer` mit einem Dead-Letter-Topic * Diese Version ist kann fachliche Fehler umleiten. * Für Deserialisierungs-Fehler funktioniert die Umleitung noch nicht! --- diff --git a/README.sh b/README.sh index ba1f88e3..8d028797 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-kafkalistener-long-deserialization-error-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-kafkalistener-long-dlt-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index 33d14489..99594abf 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.1-kafkalistener-long-deserialization-error-SNAPSHOT' +version = '1.1-kafkalistener-long-dlt-SNAPSHOT' java { toolchain { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index e570b6bd..bfe22255 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -129,6 +129,7 @@ services: echo -n Bereits konfiguriert: cat INITIALIZED kafka-topics --bootstrap-server kafka:9092 --describe --topic test + kafka-topics --bootstrap-server kafka:9092 --describe --topic test-dlt else kafka-topics --bootstrap-server kafka:9092 \ --delete \ @@ -142,6 +143,18 @@ services: --config min.insync.replicas=2 \ && echo Das Topic \'test\' wurde erfolgreich angelegt: \ && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \ + && kafka-topics --bootstrap-server kafka:9092 \ + --delete \ + --if-exists \ + --topic test-dlt + kafka-topics --bootstrap-server kafka:9092 \ + --create \ + --topic test-dlt \ + --partitions 2 \ + --replication-factor 3 \ + --config min.insync.replicas=2 \ + && echo Das Topic \'test-dlt\' wurde erfolgreich angelegt: \ + && kafka-topics --bootstrap-server kafka:9092 --describe --topic test-dlt \ && date > INITIALIZED fi stop_grace_period: 0s @@ -183,7 +196,7 @@ services: mem_limit: 100m consumer: - image: juplo/spring-consumer:1.1-kafkalistener-long-deserialization-error-SNAPSHOT + image: juplo/spring-consumer:1.1-kafkalistener-long-dlt-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: consumer @@ -192,7 +205,7 @@ services: juplo.consumer.topic: test peter: - image: juplo/spring-consumer:1.1-kafkalistener-long-deserialization-error-SNAPSHOT + image: juplo/spring-consumer:1.1-kafkalistener-long-dlt-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: peter @@ -201,7 +214,7 @@ services: juplo.consumer.topic: test ute: - image: juplo/spring-consumer:1.1-kafkalistener-long-deserialization-error-SNAPSHOT + image: juplo/spring-consumer:1.1-kafkalistener-long-dlt-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: ute diff --git a/pom.xml b/pom.xml index 91d4304b..004d14f5 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-kafkalistener-long-deserialization-error-SNAPSHOT + 1.1-kafkalistener-long-dlt-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 0069257f..560de603 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -2,9 +2,12 @@ package de.juplo.kafka; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; @SpringBootApplication +@EnableConfigurationProperties(KafkaProperties.class) public class Application { public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java new file mode 100644 index 00000000..0f18338a --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -0,0 +1,27 @@ +package de.juplo.kafka; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.listener.SeekUtils; + + +@Configuration +public class ApplicationConfiguration +{ + @Bean + public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaOperations kafkaTemplate) + { + return new DeadLetterPublishingRecoverer(kafkaTemplate); + } + + @Bean + public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer) + { + return new DefaultErrorHandler( + recoverer, + SeekUtils.DEFAULT_BACK_OFF); + } +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 6e2b7603..30dc025d 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -41,6 +41,7 @@ public class ExampleConsumer String key, Long value) { + if (value % 66 == 0) throw new RuntimeException("BOOM: Fachlicher Fehler!"); consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index e248b958..4a8efd34 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -30,6 +30,8 @@ spring: group-id: my-group properties: "[spring.deserializer.value.delegate.class]": org.apache.kafka.common.serialization.LongDeserializer + producer: + value-serializer: org.apache.kafka.common.serialization.LongSerializer logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 61197cbf..ae3eec88 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -10,6 +10,7 @@ import org.springframework.test.web.servlet.MockMvc; import java.time.Duration; import static de.juplo.kafka.ApplicationTests.PARTITIONS; +import static de.juplo.kafka.ApplicationTests.DLT; import static de.juplo.kafka.ApplicationTests.TOPIC; import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; @@ -22,10 +23,11 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", "juplo.consumer.topic=" + TOPIC }) @AutoConfigureMockMvc -@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) +@EmbeddedKafka(topics = { TOPIC, DLT }, partitions = PARTITIONS) public class ApplicationTests { static final String TOPIC = "FOO"; + public static final String DLT = TOPIC + "-dlt"; static final int PARTITIONS = 10; @Autowired