Anwendung auf den Default-ErrorHandler umgestellt
authorKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 17:19:15 +0000 (19:19 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 11 Sep 2022 14:43:07 +0000 (16:43 +0200)
* Die Tests mussten entsprechend angepasst werden, da die Methode
  `EndlessConsumer.exitStatus()` aufgrund der Umstellung nicht mehr
  verfügbar ist.
* Die Logik der Tests wurde aber nicht geändert.
* Die Tests zeigen nur, dass die Anwendung sich nicht wie zuvor beendet.
* Durch manuelle Versuchen wurden folgende Erkenntnisse gewonnen:
** Im Fall eines Deserialisierungs-Fehlers begibt sich die Anwendung in
   eine Endlosschleife.
** Da, in der Fehlersituation keine Commits durchgeführt werden, hängt
   die Anwendung dann auch nach einem Neustart weiter in der
   Fehlerschleife.
** Im Fall eines Logik-Fehlers Startet ein Back-Off mit 10 Versuchen.
** Dabei werden nach jedem Fehler die Offsets für alle Partitionen für die
   der letzte `poll()` Nachrichten geliefert hatte, die noch nicht
   verarbeitet wurden, auf die nächste unverarbeitete Nachricht zurück
   gesetzt und anchließend wird `poll()` neu ausgeführt.
** Nach dem letzten Versuch springt die Anwendung über den Fehler hinweg.

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationErrorHandler.java [deleted file]
src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index 1755747..c09eec3 100644 (file)
@@ -46,16 +46,9 @@ public class ApplicationConfiguration
         kafkaProperties.getClientId());
   }
 
-  @Bean
-  public ApplicationErrorHandler applicationErrorHandler()
-  {
-    return new ApplicationErrorHandler();
-  }
-
   @Bean
   public EndlessConsumer endlessConsumer(
       RecordHandler recordHandler,
-      ApplicationErrorHandler errorHandler,
       KafkaProperties kafkaProperties,
       KafkaListenerEndpointRegistry endpointRegistry)
   {
@@ -63,7 +56,6 @@ public class ApplicationConfiguration
         new EndlessConsumer(
             kafkaProperties.getClientId(),
             endpointRegistry,
-            errorHandler,
             recordHandler);
   }
 }
diff --git a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java
deleted file mode 100644 (file)
index 52c6a0c..0000000
+++ /dev/null
@@ -1,94 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.TopicPartition;
-import org.springframework.kafka.listener.CommonErrorHandler;
-import org.springframework.kafka.listener.MessageListenerContainer;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-
-@Slf4j
-public class ApplicationErrorHandler implements CommonErrorHandler
-{
-  private Exception exception;
-  private boolean ack = true;
-
-  @Override
-  public boolean remainingRecords()
-  {
-    return true;
-  }
-
-  @Override
-  public void handleOtherException(
-    Exception thrownException,
-    Consumer<?, ?> consumer,
-    MessageListenerContainer container,
-    boolean batchListener)
-  {
-    rememberExceptionAndStopContainer(thrownException, container);
-  }
-
-  @Override
-  public void handleRemaining(
-    Exception thrownException,
-    List<ConsumerRecord<?, ?>> records,
-    Consumer<?, ?> consumer,
-    MessageListenerContainer container)
-  {
-    Map<TopicPartition, Long> offsets = new HashMap<>();
-    records.forEach(record ->
-      offsets.computeIfAbsent(
-          new TopicPartition(record.topic(), record.partition()),
-          offset -> record.offset()));
-    offsets.forEach((tp, offset) -> consumer.seek(tp, offset));
-    rememberExceptionAndStopContainer(thrownException, container);
-  }
-
-  @Override
-  public void handleBatch(
-    Exception thrownException,
-    ConsumerRecords<?, ?> data,
-    Consumer<?, ?> consumer,
-    MessageListenerContainer container,
-    Runnable invokeListener)
-  {
-    // Do not commit the polled offsets on a logic-error
-    ack = false;
-    rememberExceptionAndStopContainer(thrownException, container);
-  }
-
-  private void rememberExceptionAndStopContainer(
-      Exception exception,
-      MessageListenerContainer container)
-  {
-    log.error("{}, stopping container {} abnormally", exception, container);
-    this.exception = exception;
-    container.stopAbnormally(() -> log.info("{} is stopped", container));
-  }
-
-  @Override
-  public boolean isAckAfterHandle()
-  {
-    return ack;
-  }
-
-
-  public Optional<Exception> getException()
-  {
-    return Optional.ofNullable(exception);
-  }
-
-  public void clearState()
-  {
-    this.exception = null;
-    this.ack = true;
-  }
-}
index ab9782c..e215c69 100644 (file)
@@ -16,17 +16,8 @@ public class ApplicationHealthIndicator implements HealthIndicator
   @Override
   public Health health()
   {
-    try
-    {
-      return consumer
-          .exitStatus()
-          .map(Health::down)
-          .orElse(Health.outOfService())
-          .build();
-    }
-    catch (IllegalStateException e)
-    {
-      return Health.up().build();
-    }
+    return consumer.running()
+        ? Health.up().build()
+        : Health.down().build();
   }
 }
index 655151a..27c1e44 100644 (file)
@@ -25,7 +25,6 @@ public class EndlessConsumer
 {
   private final String id;
   private final KafkaListenerEndpointRegistry registry;
-  private final ApplicationErrorHandler errorHandler;
   private final RecordHandler recordHandler;
 
   private long consumed = 0;
@@ -83,7 +82,6 @@ public class EndlessConsumer
       throw new IllegalStateException("Consumer instance " + id + " is already running!");
 
     log.info("{} - Starting - consumed {} messages before", id, consumed);
-    errorHandler.clearState();
     registry.getListenerContainer(id).start();
   }
 
@@ -101,12 +99,4 @@ public class EndlessConsumer
   {
     return registry.getListenerContainer(id).isRunning();
   }
-
-  public Optional<Exception> exitStatus()
-  {
-    if (running())
-      throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
-
-    return errorHandler.getException();
-  }
 }
index 49ddb47..66a80ad 100644 (file)
@@ -109,9 +109,9 @@ abstract class GenericApplicationTests<K, V>
                                        assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
                                });
 
-               assertThatExceptionOfType(IllegalStateException.class)
-                               .isThrownBy(() -> endlessConsumer.exitStatus())
-                               .describedAs("Consumer should still be running");
+               assertThat(endlessConsumer.running())
+                               .describedAs("Consumer should still be running")
+                               .isTrue();
 
                endlessConsumer.stop();
                recordGenerator.assertBusinessLogic();
@@ -144,12 +144,9 @@ abstract class GenericApplicationTests<K, V>
                                .describedAs("Received not all sent events")
                                .isLessThan(numberOfGeneratedMessages);
 
-               assertThatNoException()
-                               .describedAs("Consumer should not be running")
-                               .isThrownBy(() -> endlessConsumer.exitStatus());
-               assertThat(endlessConsumer.exitStatus())
-                               .describedAs("Consumer should have exited abnormally")
-                               .containsInstanceOf(RecordDeserializationException.class);
+               assertThat(endlessConsumer.running())
+                               .describedAs("Consumer should have exited")
+                               .isFalse();
 
                recordGenerator.assertBusinessLogic();
        }
@@ -177,12 +174,9 @@ abstract class GenericApplicationTests<K, V>
 
                assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
 
-               assertThatNoException()
+               assertThat(endlessConsumer.running())
                                .describedAs("Consumer should not be running")
-                               .isThrownBy(() -> endlessConsumer.exitStatus());
-               assertThat(endlessConsumer.exitStatus())
-                               .describedAs("Consumer should have exited abnormally")
-                               .containsInstanceOf(RuntimeException.class);
+                               .isFalse();
 
                recordGenerator.assertBusinessLogic();
        }
@@ -392,10 +386,10 @@ abstract class GenericApplicationTests<K, V>
                        return new TestRecordHandler(applicationRecordHandler);
                }
 
-    @Bean(destroyMethod = "close")
-    public org.apache.kafka.clients.consumer.Consumer<String, Message> kafkaConsumer(ConsumerFactory<String, Message> factory)
-    {
-      return factory.createConsumer();
-    }
+               @Bean(destroyMethod = "close")
+               public org.apache.kafka.clients.consumer.Consumer<String, Message> kafkaConsumer(ConsumerFactory<String, Message> factory)
+               {
+                       return factory.createConsumer();
+               }
        }
 }