Version des `spring-consumer` mit einem Dead-Letter-Topic
authorKai Moritz <kai@juplo.de>
Sun, 20 Nov 2022 14:40:35 +0000 (15:40 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 20 Nov 2022 15:38:34 +0000 (16:38 +0100)
* Diese Version ist über die `application.yml` konfiguriert.
* Über die `application.yml` kann keine Konfiguration erreicht werden, die
  sowolh die fachlichen Fehler nach erfolgter JSON-Deserialisierung, als
  auch die Poison Pills korrekt umleiten kann.

README.sh
docker-compose.yml
src/main/java/de/juplo/kafka/Application.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationIT.java

index 1dcc2cd..07db086 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -40,9 +40,17 @@ echo "Writing poison pill..."
 echo 'BOOM!' | kafkacat -P -b :9092 -t test
 # end::poisonpill[]
 
+echo "Writing logic error..."
+# tag::logicerror[]
+echo 66 | http -v :8080/peter?error=1
+# end::logicerror[]
+
 echo 66 | http -v :8080/peter
 echo 7  | http -v :8080/klaus
 
 sleep 5
 docker-compose stop consumer-1 consumer-2
 docker-compose logs -f consumer-1 consumer-2
+# tag::kafkacat[]
+kafkacat -b :9092 -t test.DLT -e -f 'p=%p|o=%o|%k=%s|h=%h\n'
+# end::kafkacat[]
index 3b6a145..d9a5507 100644 (file)
@@ -90,8 +90,11 @@ services:
     command: >
       bash -c "
         kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test.DLT
         kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 --replication-factor 3 --config min.insync.replicas=2
+        kafka-topics --bootstrap-server kafka:9092 --create --topic test.DLT --partitions 2 --replication-factor 3 --config min.insync.replicas=2
         kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+        kafka-topics --bootstrap-server kafka:9092 --describe --topic test.DLT
       "
     depends_on:
       - kafka-1
index 273cee5..f26d7a2 100644 (file)
@@ -2,13 +2,36 @@ package de.juplo.kafka;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaOperations;
+import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.util.backoff.FixedBackOff;
 
 
 @SpringBootApplication
-@EnableConfigurationProperties(ApplicationProperties.class)
+@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
 public class Application
 {
+  @Bean
+  public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
+    KafkaOperations<?, ?> kafkaTemplate)
+  {
+    return new DeadLetterPublishingRecoverer(kafkaTemplate);
+  }
+
+  @Bean
+  public DefaultErrorHandler errorHandler(
+    DeadLetterPublishingRecoverer recoverer)
+  {
+    return new DefaultErrorHandler(
+      recoverer,
+      new FixedBackOff(0l, 0l));
+  }
+
+
   public static void main(String[] args)
   {
     SpringApplication.run(Application.class, args);
index 17e94ad..61a3f85 100644 (file)
@@ -38,6 +38,9 @@ spring:
         spring.json.type.mapping: >
           ADD:de.juplo.kafka.MessageAddNumber,
           CALC:de.juplo.kafka.MessageCalculateSum
+    producer:
+      key-serializer: org.apache.kafka.common.serialization.StringSerializer
+      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
 logging:
   level:
     root: INFO
index 1baca99..8e931eb 100644 (file)
@@ -7,6 +7,7 @@ import org.springframework.boot.test.web.client.TestRestTemplate;
 import org.springframework.boot.test.web.server.LocalServerPort;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 
+import static de.juplo.kafka.ApplicationIT.DLT;
 import static de.juplo.kafka.ApplicationIT.TOPIC;
 
 
@@ -15,10 +16,11 @@ import static de.juplo.kafka.ApplicationIT.TOPIC;
     properties = {
         "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
         "simple.consumer.topic=" + TOPIC })
-@EmbeddedKafka(topics = TOPIC)
+@EmbeddedKafka(topics = { TOPIC, DLT })
 public class ApplicationIT
 {
   public static final String TOPIC = "FOO";
+  public static final String DLT = TOPIC + ".DLT";
 
   @LocalServerPort
   private int port;