Ausgabe der verarbeiteten Nachrichten im Revoke-Callback entfernt
authorKai Moritz <kai@juplo.de>
Sun, 24 Jul 2022 14:15:23 +0000 (16:15 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 24 Jul 2022 14:15:23 +0000 (16:15 +0200)
* Es musste allein für diese Ausgabe eine Map mit den zuletzt eingelesenen
  Offset-Positionen gepflegt werden.
* Das ist zu viel Overhead, für die Randmeldung im Log.

src/main/java/de/juplo/kafka/EndlessConsumer.java

index 3132bd2..2a69339 100644 (file)
@@ -35,7 +35,6 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   private long consumed = 0;
 
   private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
-  private final Map<Integer, Long> lastOffsets = new HashMap<>();
 
 
   @Override
@@ -45,13 +44,10 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
     {
       Integer partition = tp.partition();
       Long newOffset = consumer.position(tp);
-      Long oldOffset = lastOffsets.remove(partition);
       log.info(
-          "{} - removing partition: {}, consumed {} records (offset {} -> {})",
+          "{} - removing partition: {}, offset of next message {})",
           id,
           partition,
-          newOffset - oldOffset,
-          oldOffset,
           newOffset);
       Map<String, Long> removed = seen.remove(partition);
       for (String key : removed.keySet())
@@ -80,7 +76,6 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
               .findById(Integer.toString(partition))
               .orElse(new StatisticsDocument(partition));
       consumer.seek(tp, document.offset);
-      lastOffsets.put(partition, document.offset);
       seen.put(partition, document.statistics);
     });
   }