DLT auf Basis des `DeadLetterPublishingRecoverer` konfiguriert
authorKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 18:41:06 +0000 (20:41 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 11 Sep 2022 19:13:02 +0000 (21:13 +0200)
* Der `DeadLetterPublishingRecoverer` muss explizit instanziiert werden.
* Um ihm den Spring-Kafka-Beans bekannt zu machen, muss die
  `DefaultErrorHandler`-Bean überschrieben werden.
* Der Recoverer wird dem Handler zusammen mit einer BackOff-Strategie
  übergeben.
* Damit der `DeadLetterPublishingRecoverer` die weiterzuleitenden
  Nachrichten senden kann, muss
* Der Producer benötigt scheinbar einen separaten Eintrag für
  `bootstrap-servers` unter `spring.kafka.producer`. Der Eintrag
  `spring.kafa.bootstrap-servers` wird hier nicht übernommen!

docker-compose.yml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index 16fec5b..960bbc2 100644 (file)
@@ -85,10 +85,13 @@ services:
       bash -c "
         kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic in
         kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out
+        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out.DLT
         kafka-topics --bootstrap-server kafka:9092 --create --topic in --partitions 2 --replication-factor 3 --config min.insync.replicas=2
         kafka-topics --bootstrap-server kafka:9092 --create --topic out --partitions 2 --replication-factor 3 --config min.insync.replicas=2  
+        kafka-topics --bootstrap-server kafka:9092 --create --topic out.DLT --partitions 2 --replication-factor 3 --config min.insync.replicas=2  
         kafka-topics --bootstrap-server kafka:9092 --describe --topic in
         kafka-topics --bootstrap-server kafka:9092 --describe --topic out
+        kafka-topics --bootstrap-server kafka:9092 --describe --topic out.DLT
       "
 
   cli:
@@ -130,6 +133,7 @@ services:
     environment:
       server.port: 8080
       spring.kafka.bootstrap-servers: kafka:9092
+      spring.kafka.producer.bootstrap-servers: kafka:9092
       spring.kafak.client-id: adder-1
       spring.kafka.auto-commit-interval: 1s
       sumup.adder.throttle: 3ms
@@ -144,6 +148,7 @@ services:
     environment:
       server.port: 8080
       spring.kafka.bootstrap-servers: kafka:9092
+      spring.kafka.producer.bootstrap-servers: kafka:9092
       spring.kafak.client-id: adder-2
       spring.kafka.auto-commit-interval: 1s
       sumup.adder.throttle: 3ms
index c09eec3..b5f6187 100644 (file)
@@ -1,13 +1,25 @@
 package de.juplo.kafka;
 
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import java.util.Map;
 import java.util.Optional;
 
 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaOperations;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.backoff.FixedBackOff;
 
 
 @Configuration
@@ -58,4 +70,41 @@ public class ApplicationConfiguration
             endpointRegistry,
             recordHandler);
   }
+
+  @Bean
+  public ProducerFactory<String, Object> producerFactory(
+      KafkaProperties properties)
+  {
+    return new DefaultKafkaProducerFactory<>(
+        properties.getProducer().buildProperties(),
+        new StringSerializer(),
+        new DelegatingByTypeSerializer(
+            Map.of(
+                byte[].class, new ByteArraySerializer(),
+                MessageAddNumber.class, new JsonSerializer<>(),
+                MessageCalculateSum.class, new JsonSerializer<>())));
+  }
+
+  @Bean
+  public KafkaTemplate<String, Object> kafkaTemplate(
+      ProducerFactory<String, Object> producerFactory)
+  {
+    return new KafkaTemplate<>(producerFactory);
+  }
+
+  @Bean
+  public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
+      KafkaOperations<?, ?> kafkaTemplate)
+  {
+    return new DeadLetterPublishingRecoverer(kafkaTemplate);
+  }
+
+  @Bean
+  public DefaultErrorHandler errorHandler(
+      DeadLetterPublishingRecoverer recoverer)
+  {
+    return new DefaultErrorHandler(
+        recoverer,
+        new FixedBackOff(0l, 0l));
+  }
 }
index 92f3a6b..0bc592c 100644 (file)
@@ -34,10 +34,17 @@ spring:
       auto-offset-reset: earliest
       auto-commit-interval: 5s
       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
       properties:
         partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor
         metadata.max.age.ms: 1000
+        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
+        spring.json.type.mapping: >
+          ADD:de.juplo.kafka.MessageAddNumber,
+          CALC:de.juplo.kafka.MessageCalculateSum
+    producer:
+      bootstrap-servers: :9092
+      properties:
         spring.json.type.mapping: >
           ADD:de.juplo.kafka.MessageAddNumber,
           CALC:de.juplo.kafka.MessageCalculateSum
index 4793d96..b98066f 100644 (file)
@@ -6,7 +6,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.serialization.*;
 import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.*;
@@ -42,6 +41,7 @@ import static org.awaitility.Awaitility.*;
 @TestPropertySource(
                properties = {
                                "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+                               "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
                                "sumup.adder.topic=" + TOPIC,
                                "spring.kafka.consumer.auto-commit-interval=500ms",
                                "spring.mongodb.embedded.version=4.4.13" })
@@ -124,32 +124,29 @@ abstract class GenericApplicationTests<K, V>
        {
                recordGenerator.generate(true, false, messageSender);
 
-               int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
+               int numberOfValidMessages =
+                               recordGenerator.getNumberOfMessages() -
+                               recordGenerator.getNumberOfPoisonPills();
 
-               await("Consumer failed")
+               await(numberOfValidMessages + " records received")
                                .atMost(Duration.ofSeconds(30))
                                .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
-
-               checkSeenOffsetsForProgress();
-               assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+                               .until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
 
-               endlessConsumer.start();
-               await("Consumer failed")
-                               .atMost(Duration.ofSeconds(30))
+               await("Offsets committed")
+                               .atMost(Duration.ofSeconds(10))
                                .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
-
-               checkSeenOffsetsForProgress();
-               assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
-               assertThat(recordHandler.receivedMessages)
-                               .describedAs("Received not all sent events")
-                               .isLessThan(numberOfGeneratedMessages);
+                               .untilAsserted(() ->
+                               {
+                                       checkSeenOffsetsForProgress();
+                                       assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+                               });
 
                assertThat(endlessConsumer.running())
-                               .describedAs("Consumer should have exited")
-                               .isFalse();
+                               .describedAs("Consumer should still be running")
+                               .isTrue();
 
+               endlessConsumer.stop();
                recordGenerator.assertBusinessLogic();
        }
 
@@ -159,28 +156,29 @@ abstract class GenericApplicationTests<K, V>
        {
                recordGenerator.generate(false, true, messageSender);
 
-               int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
+               int numberOfValidMessages =
+                               recordGenerator.getNumberOfMessages() -
+                               recordGenerator.getNumberOfLogicErrors();
 
-               await("Consumer failed")
+               await(numberOfValidMessages + " records received")
                                .atMost(Duration.ofSeconds(30))
                                .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
-
-               checkSeenOffsetsForProgress();
-               assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+                               .until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
 
-               endlessConsumer.start();
-               await("Consumer failed")
-                               .atMost(Duration.ofSeconds(30))
+               await("Offsets committed")
+                               .atMost(Duration.ofSeconds(10))
                                .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
-
-               assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+                               .untilAsserted(() ->
+                               {
+                                       checkSeenOffsetsForProgress();
+                                       assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+                               });
 
                assertThat(endlessConsumer.running())
-                               .describedAs("Consumer should not be running")
-                               .isFalse();
+                               .describedAs("Consumer should still be running")
+                               .isTrue();
 
+               endlessConsumer.stop();
                recordGenerator.assertBusinessLogic();
        }