Springify: DLQ für Poison Pills konfiguriert
authorKai Moritz <kai@juplo.de>
Sat, 23 Apr 2022 08:11:17 +0000 (10:11 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 5 Jun 2022 12:14:42 +0000 (14:14 +0200)
* Den Producer so konfiguriert, dass er einenen `ByteArraySerializer`
  verwendet.
* Dafür wird der `DefaultErrorHandler` explizit konfiguriert und mit
  einem `DeadLetterPublishingRecoverer` konfiguriert.
* Der `DefaultErrorHandler` ist mit einer `FixedBackOff`-Strategie
  konfiguriert, die die Nachricht direkt nach dem ersten Fehler auf
  das DLQ-Topic umleitet.
* Damit der Producer die als `byte[]` übergebene fehlerhafte Nachricht
  serialisieren kann, wurde er mit einem `ByteArraySerializer`
  konfiguriert.
* *TODO:* Auch Exceptions, die in dem `MessageListener` also dem
  `EndlessConsumer` geworfen werden, werden von dem im
  `DefaultErrorHandler` konfigurierten `DeadLetterPublishingRecoverer`
  in die DLQ geschickt, Problem dabei:
** Der Producer kann diese so wie hier konfiguriert noch nicht
   serialisieren.
** Für diese Fehler wäre aber auch eh eigentlich das Stop-World-Verhalten
   zu bevorzugen.

docker-compose.yml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationErrorHandler.java [deleted file]
src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

index b03bb1e..317701f 100644 (file)
@@ -30,6 +30,8 @@ services:
       bash -c "
         kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
         kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
+        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic dlq
+        kafka-topics --bootstrap-server kafka:9092 --create --topic dlq --partitions 2
       "
 
   cli:
index 26b81cb..6ab716e 100644 (file)
@@ -1,11 +1,16 @@
 package de.juplo.kafka;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.KafkaOperations;
+import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.util.backoff.FixedBackOff;
 
 import java.util.function.Consumer;
 
@@ -24,9 +29,19 @@ public class ApplicationConfiguration
   }
 
   @Bean
-  public ApplicationErrorHandler errorHandler()
+  public DeadLetterPublishingRecoverer recoverer(
+      ApplicationProperties properties,
+      KafkaOperations<?, ?> template)
   {
-    return new ApplicationErrorHandler();
+    return new DeadLetterPublishingRecoverer(
+        template,
+        (record, exception) -> new TopicPartition(properties.getDlqTopic(), record.partition()));
+  }
+
+  @Bean
+  public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer)
+  {
+    return new DefaultErrorHandler(recoverer, new FixedBackOff(0l, 0l));
   }
 
   @Bean(destroyMethod = "close")
diff --git a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java
deleted file mode 100644 (file)
index 273f509..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-package de.juplo.kafka;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
-import org.springframework.kafka.listener.MessageListenerContainer;
-
-import java.util.List;
-import java.util.Optional;
-
-
-public class ApplicationErrorHandler extends CommonContainerStoppingErrorHandler
-{
-  private Exception exception;
-
-
-  public synchronized Optional<Exception> getException()
-  {
-    return Optional.ofNullable(exception);
-  }
-
-  public synchronized void clearException()
-  {
-    this.exception = null;
-  }
-
-
-  @Override
-  public void handleOtherException(
-      Exception thrownException, Consumer<?, ?> consumer,
-      MessageListenerContainer container,
-      boolean batchListener)
-  {
-    this.exception = thrownException;
-    super.handleOtherException(thrownException, consumer, container, batchListener);
-  }
-
-  @Override
-  public void handleRemaining(
-      Exception thrownException,
-      List<ConsumerRecord<?, ?>> records,
-      Consumer<?, ?> consumer,
-      MessageListenerContainer container)
-  {
-    this.exception = thrownException;
-    super.handleRemaining(thrownException, records, consumer, container);
-  }
-
-  @Override
-  public void handleBatch(
-      Exception thrownException,
-      ConsumerRecords<?, ?> data,
-      Consumer<?, ?> consumer,
-      MessageListenerContainer container,
-      Runnable invokeListener)
-  {
-    this.exception = thrownException;
-    super.handleBatch(thrownException, data, consumer, container, invokeListener);
-  }
-}
index dc3a26e..742550e 100644 (file)
@@ -16,17 +16,6 @@ public class ApplicationHealthIndicator implements HealthIndicator
   @Override
   public Health health()
   {
-    try
-    {
-      return consumer
-          .exitStatus()
-          .map(Health::down)
-          .orElse(Health.outOfService())
-          .build();
-    }
-    catch (IllegalStateException e)
-    {
-      return Health.up().build();
-    }
+    return consumer.isRunning() ? Health.up().build() : Health.down().build();
   }
 }
index c7c4f78..511d68d 100644 (file)
@@ -18,4 +18,7 @@ public class ApplicationProperties
   @NotNull
   @NotEmpty
   private String topic;
+  @NotNull
+  @NotEmpty
+  private String dlqTopic;
 }
index 480e7d1..8bc3115 100644 (file)
@@ -10,7 +10,6 @@ import org.springframework.web.bind.annotation.RestController;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 
 
 @RestController
index 6d0c69d..04a0a3a 100644 (file)
@@ -14,7 +14,6 @@ import org.springframework.stereotype.Component;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.function.Consumer;
 
 
@@ -29,8 +28,6 @@ public class EndlessConsumer<K, V> implements ConsumerAwareRebalanceListener
   String id;
   @Autowired
   Consumer<ConsumerRecord<K, V>> handler;
-  @Autowired
-  ApplicationErrorHandler errorHandler;
 
   private long consumed = 0;
 
@@ -113,7 +110,6 @@ public class EndlessConsumer<K, V> implements ConsumerAwareRebalanceListener
       throw new IllegalStateException("Consumer instance " + id + " is already running!");
 
     log.info("{} - Starting - consumed {} messages before", id, consumed);
-    errorHandler.clearException();
     registry.getListenerContainer(id).start();
   }
 
@@ -127,11 +123,8 @@ public class EndlessConsumer<K, V> implements ConsumerAwareRebalanceListener
     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
   }
 
-  public synchronized Optional<Exception> exitStatus()
+  public synchronized boolean isRunning()
   {
-    if (registry.getListenerContainer(id).isChildRunning())
-      throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
-
-    return errorHandler.getException();
+    return registry.getListenerContainer(id).isChildRunning();
   }
 }
index 43cde87..4b22bd2 100644 (file)
@@ -1,5 +1,6 @@
 consumer:
   topic: test
+  dlq-topic: dlq
 management:
   endpoint:
     shutdown:
@@ -27,10 +28,15 @@ spring:
       client-id: DEV
       auto-offset-reset: earliest
       group-id: my-group
-      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
       properties:
+        spring.deserializer.value.delegate.class: "org.springframework.kafka.support.serializer.JsonDeserializer"
         spring.json.type.mapping: "message:de.juplo.kafka.ClientMessage"
         spring.json.trusted.packages: "de.juplo.kafka"
+    producer:
+      bootstrap-servers: :9092
+      client-id: DEV
+      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
 logging:
   level:
     root: INFO
index 3f6a6a8..52ecc0f 100644 (file)
@@ -18,6 +18,7 @@ import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Import;
 import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.listener.MessageListenerContainer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.context.TestPropertySource;
@@ -48,6 +49,7 @@ import static org.awaitility.Awaitility.*;
 @TestPropertySource(
                properties = {
                                "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
+                               "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
                                "consumer.topic=" + TOPIC })
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
 @Slf4j
@@ -91,7 +93,7 @@ class ApplicationTests
 
                await("100 records received")
                                .atMost(Duration.ofSeconds(30))
-                               .until(() -> receivedRecords.size() >= 100);
+                               .until(() -> receivedRecords.size() == 100);
 
                await("Offsets committed")
                                .atMost(Duration.ofSeconds(10))
@@ -101,35 +103,42 @@ class ApplicationTests
                                        compareToCommitedOffsets(newOffsets);
                                });
 
-               assertThatExceptionOfType(IllegalStateException.class)
-                               .isThrownBy(() -> endlessConsumer.exitStatus())
-                               .describedAs("Consumer should still be running");
+               assertThat(endlessConsumer.isRunning())
+                               .describedAs("Consumer should still be running")
+                               .isTrue();
        }
 
        @Test
        @Order(2)
-       void commitsOffsetOfErrorForReprocessingOnError()
+       void commitsCurrentOffsetsOnError()
        {
                send100Messages((key, counter) ->
                                counter == 77
                                                ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
                                                : serialize(key, counter));
 
-               await("Consumer failed")
+               await("99 records received")
                                .atMost(Duration.ofSeconds(30))
-                               .untilAsserted(() -> checkSeenOffsetsForProgress());
-
-               compareToCommitedOffsets(newOffsets);
-               assertThat(receivedRecords.size())
-                               .describedAs("Received not all sent events")
-                               .isLessThan(100);
-
-               assertThatNoException()
-                               .describedAs("Consumer should not be running")
-                               .isThrownBy(() -> endlessConsumer.exitStatus());
-               assertThat(endlessConsumer.exitStatus())
-                               .containsInstanceOf(RecordDeserializationException.class)
-                               .describedAs("Consumer should have exited abnormally");
+                               .until(() -> receivedRecords.size() == 99);
+
+               await("Offsets committed")
+                               .atMost(Duration.ofSeconds(10))
+                               .untilAsserted(() ->
+                               {
+                                       // UNSCHÖN:
+                                       // Funktioniert nur, weil nach der Nachrichten, die den
+                                       // Deserialisierungs-Fehler auslöst noch valide Nachrichten
+                                       // gelesen werden.
+                                       // GRUND:
+                                       // Der MessageHandler sieht den Offset der Fehlerhaften
+                                       // Nachricht nicht!
+                                       checkSeenOffsetsForProgress();
+                                       compareToCommitedOffsets(newOffsets);
+                               });
+
+               assertThat(endlessConsumer.isRunning())
+                               .describedAs("Consumer should still be running")
+                               .isTrue();
        }