]> juplo.de Git - demos/kafka/training/commitdiff
`EndlessConsumer` nimmt jetzt einzelne `ConsumerRecord`s entgegen
authorKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 12:15:11 +0000 (14:15 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 16:08:45 +0000 (18:08 +0200)
* `@KafkaHandler` von Batch-Verarbeitung auf Einzel-Verarbeitung umgestellt.
* Den `ApplicationErrorHandler` um eine passende Fehler-Verarbeitung für
  die einzelne Verarbeitung der Nachrichten ergänzt
* Da der `MessageListenerContainer` nicht dazu zu bewegen ist, die
  Offset-Commits im Fehlerfall zu unterlassen, wird explizit ein Seek auf
  die Offset-Positionen der noch nicht verarbeiteten Nachrichten
  durchgeführt.
* Dabei wurde ein von Spring Kafka abgeschauter Trick verwendet: Es genügt,
  die bisher unverarbeiteten Nachrichten durchzugehen und jeweils den
  Offset der ersten Nachricht, die zu einer Partition gesehen wird, für
  den Seek vorzumerken. Denn wenn dabei für eine Partition keine Nachricht
  gefunden wird, hat entweder das letzte `poll() keine Nachricht zu der
  Partition geliefert, oder alle Nachrichten, die zu der Partition gehört
  haben, wurden erfolgreich verarbeitet. In beiden Fällen stimmt der
  Offset bereits, den die Kafka-Bibliothek intern pflegt, so dass kein
  Seek durchgeführt werden muss!
* Der Testfall wurde entsprechend angepasst und läuft daher in dieser
  Variante auch ohne Fehler, da der gespeicherte Zustand dadurch zu den
  bestätigten Offsets passt.

src/main/java/de/juplo/kafka/ApplicationErrorHandler.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index 6e157173272bb629fc317c1aceb3033cbb2fedc7..52c6a0c2fdc936bba94006456446e32380b13b54 100644 (file)
@@ -2,11 +2,15 @@ package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
 import org.springframework.kafka.listener.CommonErrorHandler;
 import org.springframework.kafka.listener.MessageListenerContainer;
-import org.springframework.util.Assert;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 
@@ -16,6 +20,11 @@ public class ApplicationErrorHandler implements CommonErrorHandler
   private Exception exception;
   private boolean ack = true;
 
+  @Override
+  public boolean remainingRecords()
+  {
+    return true;
+  }
 
   @Override
   public void handleOtherException(
@@ -24,7 +33,22 @@ public class ApplicationErrorHandler implements CommonErrorHandler
     MessageListenerContainer container,
     boolean batchListener)
   {
-    Assert.isTrue(batchListener, getClass().getName() + " is only applicable for Batch-Listeners");
+    rememberExceptionAndStopContainer(thrownException, container);
+  }
+
+  @Override
+  public void handleRemaining(
+    Exception thrownException,
+    List<ConsumerRecord<?, ?>> records,
+    Consumer<?, ?> consumer,
+    MessageListenerContainer container)
+  {
+    Map<TopicPartition, Long> offsets = new HashMap<>();
+    records.forEach(record ->
+      offsets.computeIfAbsent(
+          new TopicPartition(record.topic(), record.partition()),
+          offset -> record.offset()));
+    offsets.forEach((tp, offset) -> consumer.seek(tp, offset));
     rememberExceptionAndStopContainer(thrownException, container);
   }
 
index d3d11ae976ae268527503aa2611328814c27d0a4..01397a208e4ee4ef6430d1d46b5f68b2e73edece 100644 (file)
@@ -26,14 +26,9 @@ public class EndlessConsumer<K, V>
       id = "${spring.kafka.client-id}",
       idIsGroup = false,
       topics = "${sumup.adder.topic}",
-      batch = "true",
       autoStartup = "false")
-  public void accept(List<ConsumerRecord<K, V>> records)
+  public void accept(ConsumerRecord<K, V> record)
   {
-        // Do something with the data...
-        log.info("{} - Received {} messages", id, records.size());
-        for (ConsumerRecord<K, V> record : records)
-        {
           log.info(
               "{} - {}: {}/{} - {}={}",
               id,
@@ -47,7 +42,6 @@ public class EndlessConsumer<K, V>
           recordHandler.accept(record);
 
           consumed++;
-        }
   }
 
   public void start()
index 753debe2b00344766c2c9a4f90286e0b6e075337..124143c32ad92a35dcbae0f9994a3e65997f348e 100644 (file)
@@ -157,7 +157,7 @@ abstract class GenericApplicationTests<K, V>
 
        @Test
        @SkipWhenErrorCannotBeGenerated(logicError = true)
-       void doesNotCommitOffsetsOnLogicError()
+       void commitsOffsetsOfUnseenRecordsOnLogicError()
        {
                int numberOfGeneratedMessages =
                                recordGenerator.generate(false, true, messageSender);
@@ -168,7 +168,7 @@ abstract class GenericApplicationTests<K, V>
                                .until(() -> !endlessConsumer.running());
 
                checkSeenOffsetsForProgress();
-               assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
+               assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
 
                endlessConsumer.start();
                await("Consumer failed")
@@ -176,7 +176,7 @@ abstract class GenericApplicationTests<K, V>
                                .pollInterval(Duration.ofSeconds(1))
                                .until(() -> !endlessConsumer.running());
 
-               assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
+               assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
 
                assertThatNoException()
                                .describedAs("Consumer should not be running")