`EndlessConsumer` nimmt jetzt einzelne `ConsumerRecord`s entgegen
authorKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 12:15:11 +0000 (14:15 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 16:08:45 +0000 (18:08 +0200)
* `@KafkaHandler` von Batch-Verarbeitung auf Einzel-Verarbeitung umgestellt.
* Den `ApplicationErrorHandler` um eine passende Fehler-Verarbeitung für
  die einzelne Verarbeitung der Nachrichten ergänzt
* Da der `MessageListenerContainer` nicht dazu zu bewegen ist, die
  Offset-Commits im Fehlerfall zu unterlassen, wird explizit ein Seek auf
  die Offset-Positionen der noch nicht verarbeiteten Nachrichten
  durchgeführt.
* Dabei wurde ein von Spring Kafka abgeschauter Trick verwendet: Es genügt,
  die bisher unverarbeiteten Nachrichten durchzugehen und jeweils den
  Offset der ersten Nachricht, die zu einer Partition gesehen wird, für
  den Seek vorzumerken. Denn wenn dabei für eine Partition keine Nachricht
  gefunden wird, hat entweder das letzte `poll() keine Nachricht zu der
  Partition geliefert, oder alle Nachrichten, die zu der Partition gehört
  haben, wurden erfolgreich verarbeitet. In beiden Fällen stimmt der
  Offset bereits, den die Kafka-Bibliothek intern pflegt, so dass kein
  Seek durchgeführt werden muss!
* Der Testfall wurde entsprechend angepasst und läuft daher in dieser
  Variante auch ohne Fehler, da der gespeicherte Zustand dadurch zu den
  bestätigten Offsets passt.

src/main/java/de/juplo/kafka/ApplicationErrorHandler.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index 6e15717..52c6a0c 100644 (file)
@@ -2,11 +2,15 @@ package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
 import org.springframework.kafka.listener.CommonErrorHandler;
 import org.springframework.kafka.listener.MessageListenerContainer;
-import org.springframework.util.Assert;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 
@@ -16,6 +20,11 @@ public class ApplicationErrorHandler implements CommonErrorHandler
   private Exception exception;
   private boolean ack = true;
 
+  @Override
+  public boolean remainingRecords()
+  {
+    return true;
+  }
 
   @Override
   public void handleOtherException(
@@ -24,7 +33,22 @@ public class ApplicationErrorHandler implements CommonErrorHandler
     MessageListenerContainer container,
     boolean batchListener)
   {
-    Assert.isTrue(batchListener, getClass().getName() + " is only applicable for Batch-Listeners");
+    rememberExceptionAndStopContainer(thrownException, container);
+  }
+
+  @Override
+  public void handleRemaining(
+    Exception thrownException,
+    List<ConsumerRecord<?, ?>> records,
+    Consumer<?, ?> consumer,
+    MessageListenerContainer container)
+  {
+    Map<TopicPartition, Long> offsets = new HashMap<>();
+    records.forEach(record ->
+      offsets.computeIfAbsent(
+          new TopicPartition(record.topic(), record.partition()),
+          offset -> record.offset()));
+    offsets.forEach((tp, offset) -> consumer.seek(tp, offset));
     rememberExceptionAndStopContainer(thrownException, container);
   }
 
index d3d11ae..01397a2 100644 (file)
@@ -26,14 +26,9 @@ public class EndlessConsumer<K, V>
       id = "${spring.kafka.client-id}",
       idIsGroup = false,
       topics = "${sumup.adder.topic}",
-      batch = "true",
       autoStartup = "false")
-  public void accept(List<ConsumerRecord<K, V>> records)
+  public void accept(ConsumerRecord<K, V> record)
   {
-        // Do something with the data...
-        log.info("{} - Received {} messages", id, records.size());
-        for (ConsumerRecord<K, V> record : records)
-        {
           log.info(
               "{} - {}: {}/{} - {}={}",
               id,
@@ -47,7 +42,6 @@ public class EndlessConsumer<K, V>
           recordHandler.accept(record);
 
           consumed++;
-        }
   }
 
   public void start()
index 753debe..124143c 100644 (file)
@@ -157,7 +157,7 @@ abstract class GenericApplicationTests<K, V>
 
        @Test
        @SkipWhenErrorCannotBeGenerated(logicError = true)
-       void doesNotCommitOffsetsOnLogicError()
+       void commitsOffsetsOfUnseenRecordsOnLogicError()
        {
                int numberOfGeneratedMessages =
                                recordGenerator.generate(false, true, messageSender);
@@ -168,7 +168,7 @@ abstract class GenericApplicationTests<K, V>
                                .until(() -> !endlessConsumer.running());
 
                checkSeenOffsetsForProgress();
-               assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
+               assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
 
                endlessConsumer.start();
                await("Consumer failed")
@@ -176,7 +176,7 @@ abstract class GenericApplicationTests<K, V>
                                .pollInterval(Duration.ofSeconds(1))
                                .until(() -> !endlessConsumer.running());
 
-               assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
+               assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
 
                assertThatNoException()
                                .describedAs("Consumer should not be running")