From 418b5761b1aaaa1ac1d772ad2185c9760a1c2ec7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 24 Jul 2022 16:15:23 +0200 Subject: [PATCH] Ausgabe der verarbeiteten Nachrichten im Revoke-Callback entfernt MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Es musste allein für diese Ausgabe eine Map mit den zuletzt eingelesenen Offset-Positionen gepflegt werden. * Das ist zu viel Overhead, für die Randmeldung im Log. --- src/main/java/de/juplo/kafka/EndlessConsumer.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 3132bd2..2a69339 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -35,7 +35,6 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl private long consumed = 0; private final Map> seen = new HashMap<>(); - private final Map lastOffsets = new HashMap<>(); @Override @@ -45,13 +44,10 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl { Integer partition = tp.partition(); Long newOffset = consumer.position(tp); - Long oldOffset = lastOffsets.remove(partition); log.info( - "{} - removing partition: {}, consumed {} records (offset {} -> {})", + "{} - removing partition: {}, offset of next message {})", id, partition, - newOffset - oldOffset, - oldOffset, newOffset); Map removed = seen.remove(partition); for (String key : removed.keySet()) @@ -80,7 +76,6 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl .findById(Integer.toString(partition)) .orElse(new StatisticsDocument(partition)); consumer.seek(tp, document.offset); - lastOffsets.put(partition, document.offset); seen.put(partition, document.statistics); }); } -- 2.20.1