From 316c89aa6aafbc339eda3727638c75f5489c0a99 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 10 Sep 2022 14:15:11 +0200 Subject: [PATCH] `EndlessConsumer` nimmt jetzt einzelne `ConsumerRecord`s entgegen MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * `@KafkaHandler` von Batch-Verarbeitung auf Einzel-Verarbeitung umgestellt. * Den `ApplicationErrorHandler` um eine passende Fehler-Verarbeitung für die einzelne Verarbeitung der Nachrichten ergänzt * Da der `MessageListenerContainer` nicht dazu zu bewegen ist, die Offset-Commits im Fehlerfall zu unterlassen, wird explizit ein Seek auf die Offset-Positionen der noch nicht verarbeiteten Nachrichten durchgeführt. * Dabei wurde ein von Spring Kafka abgeschauter Trick verwendet: Es genügt, die bisher unverarbeiteten Nachrichten durchzugehen und jeweils den Offset der ersten Nachricht, die zu einer Partition gesehen wird, für den Seek vorzumerken. Denn wenn dabei für eine Partition keine Nachricht gefunden wird, hat entweder das letzte `poll() keine Nachricht zu der Partition geliefert, oder alle Nachrichten, die zu der Partition gehört haben, wurden erfolgreich verarbeitet. In beiden Fällen stimmt der Offset bereits, den die Kafka-Bibliothek intern pflegt, so dass kein Seek durchgeführt werden muss! * Der Testfall wurde entsprechend angepasst und läuft daher in dieser Variante auch ohne Fehler, da der gespeicherte Zustand dadurch zu den bestätigten Offsets passt. --- .../juplo/kafka/ApplicationErrorHandler.java | 28 +++++++++++++++++-- .../java/de/juplo/kafka/EndlessConsumer.java | 8 +----- .../juplo/kafka/GenericApplicationTests.java | 6 ++-- 3 files changed, 30 insertions(+), 12 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java index 6e15717..52c6a0c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java @@ -2,11 +2,15 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.MessageListenerContainer; -import org.springframework.util.Assert; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; @@ -16,6 +20,11 @@ public class ApplicationErrorHandler implements CommonErrorHandler private Exception exception; private boolean ack = true; + @Override + public boolean remainingRecords() + { + return true; + } @Override public void handleOtherException( @@ -24,7 +33,22 @@ public class ApplicationErrorHandler implements CommonErrorHandler MessageListenerContainer container, boolean batchListener) { - Assert.isTrue(batchListener, getClass().getName() + " is only applicable for Batch-Listeners"); + rememberExceptionAndStopContainer(thrownException, container); + } + + @Override + public void handleRemaining( + Exception thrownException, + List> records, + Consumer consumer, + MessageListenerContainer container) + { + Map offsets = new HashMap<>(); + records.forEach(record -> + offsets.computeIfAbsent( + new TopicPartition(record.topic(), record.partition()), + offset -> record.offset())); + offsets.forEach((tp, offset) -> consumer.seek(tp, offset)); rememberExceptionAndStopContainer(thrownException, container); } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index d3d11ae..01397a2 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -26,14 +26,9 @@ public class EndlessConsumer id = "${spring.kafka.client-id}", idIsGroup = false, topics = "${sumup.adder.topic}", - batch = "true", autoStartup = "false") - public void accept(List> records) + public void accept(ConsumerRecord record) { - // Do something with the data... - log.info("{} - Received {} messages", id, records.size()); - for (ConsumerRecord record : records) - { log.info( "{} - {}: {}/{} - {}={}", id, @@ -47,7 +42,6 @@ public class EndlessConsumer recordHandler.accept(record); consumed++; - } } public void start() diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 753debe..124143c 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -157,7 +157,7 @@ abstract class GenericApplicationTests @Test @SkipWhenErrorCannotBeGenerated(logicError = true) - void doesNotCommitOffsetsOnLogicError() + void commitsOffsetsOfUnseenRecordsOnLogicError() { int numberOfGeneratedMessages = recordGenerator.generate(false, true, messageSender); @@ -168,7 +168,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -176,7 +176,7 @@ abstract class GenericApplicationTests .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); - assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); assertThatNoException() .describedAs("Consumer should not be running") -- 2.20.1