From 2ed4e6d438d33a6669eb20526665acdc8fb62d21 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 20 Nov 2022 15:40:35 +0100 Subject: [PATCH] Version des `spring-consumer` mit einem Dead-Letter-Topic MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Diese Version ist über die `application.yml` konfiguriert. * Über die `application.yml` kann keine Konfiguration erreicht werden, die sowolh die fachlichen Fehler nach erfolgter JSON-Deserialisierung, als auch die Poison Pills korrekt umleiten kann. --- README.sh | 8 ++++++ docker-compose.yml | 3 +++ src/main/java/de/juplo/kafka/Application.java | 25 ++++++++++++++++++- src/main/resources/application.yml | 3 +++ .../java/de/juplo/kafka/ApplicationIT.java | 4 ++- 5 files changed, 41 insertions(+), 2 deletions(-) diff --git a/README.sh b/README.sh index 1dcc2cd..07db086 100755 --- a/README.sh +++ b/README.sh @@ -40,9 +40,17 @@ echo "Writing poison pill..." echo 'BOOM!' | kafkacat -P -b :9092 -t test # end::poisonpill[] +echo "Writing logic error..." +# tag::logicerror[] +echo 66 | http -v :8080/peter?error=1 +# end::logicerror[] + echo 66 | http -v :8080/peter echo 7 | http -v :8080/klaus sleep 5 docker-compose stop consumer-1 consumer-2 docker-compose logs -f consumer-1 consumer-2 +# tag::kafkacat[] +kafkacat -b :9092 -t test.DLT -e -f 'p=%p|o=%o|%k=%s|h=%h\n' +# end::kafkacat[] diff --git a/docker-compose.yml b/docker-compose.yml index 3b6a145..d9a5507 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -90,8 +90,11 @@ services: command: > bash -c " kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test + kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test.DLT kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 --replication-factor 3 --config min.insync.replicas=2 + kafka-topics --bootstrap-server kafka:9092 --create --topic test.DLT --partitions 2 --replication-factor 3 --config min.insync.replicas=2 kafka-topics --bootstrap-server kafka:9092 --describe --topic test + kafka-topics --bootstrap-server kafka:9092 --describe --topic test.DLT " depends_on: - kafka-1 diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 273cee5..f26d7a2 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -2,13 +2,36 @@ package de.juplo.kafka; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.util.backoff.FixedBackOff; @SpringBootApplication -@EnableConfigurationProperties(ApplicationProperties.class) +@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) public class Application { + @Bean + public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer( + KafkaOperations kafkaTemplate) + { + return new DeadLetterPublishingRecoverer(kafkaTemplate); + } + + @Bean + public DefaultErrorHandler errorHandler( + DeadLetterPublishingRecoverer recoverer) + { + return new DefaultErrorHandler( + recoverer, + new FixedBackOff(0l, 0l)); + } + + public static void main(String[] args) { SpringApplication.run(Application.class, args); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 17e94ad..61a3f85 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -38,6 +38,9 @@ spring: spring.json.type.mapping: > ADD:de.juplo.kafka.MessageAddNumber, CALC:de.juplo.kafka.MessageCalculateSum + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationIT.java index 1baca99..8e931eb 100644 --- a/src/test/java/de/juplo/kafka/ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/ApplicationIT.java @@ -7,6 +7,7 @@ import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.boot.test.web.server.LocalServerPort; import org.springframework.kafka.test.context.EmbeddedKafka; +import static de.juplo.kafka.ApplicationIT.DLT; import static de.juplo.kafka.ApplicationIT.TOPIC; @@ -15,10 +16,11 @@ import static de.juplo.kafka.ApplicationIT.TOPIC; properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "simple.consumer.topic=" + TOPIC }) -@EmbeddedKafka(topics = TOPIC) +@EmbeddedKafka(topics = { TOPIC, DLT }) public class ApplicationIT { public static final String TOPIC = "FOO"; + public static final String DLT = TOPIC + ".DLT"; @LocalServerPort private int port; -- 2.20.1