Fehler in Logging-Ausgabe korrigiert
authorKai Moritz <kai@juplo.de>
Sun, 24 Jul 2022 14:12:04 +0000 (16:12 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 24 Jul 2022 14:12:04 +0000 (16:12 +0200)
* Der über den Merge hinzugefügt Test hat einen Fehler aufgedeckt.
* In onPartitionsRevoked() wurde bei der Berechnung der verarbeiteten
  Nachrichten für die Log-Ausgabe ein Nullzeiger dereferenziert.
* Ursache dafür war, dass die Map `offsets` in der Version, die die Offsets
  speichert gar nicht mehr gepflegt wurde.

src/main/java/de/juplo/kafka/EndlessConsumer.java

index 366a3c2..3132bd2 100644 (file)
@@ -35,7 +35,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   private long consumed = 0;
 
   private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
-  private final Map<Integer, Long> offsets = new HashMap<>();
+  private final Map<Integer, Long> lastOffsets = new HashMap<>();
 
 
   @Override
@@ -45,7 +45,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
     {
       Integer partition = tp.partition();
       Long newOffset = consumer.position(tp);
-      Long oldOffset = offsets.remove(partition);
+      Long oldOffset = lastOffsets.remove(partition);
       log.info(
           "{} - removing partition: {}, consumed {} records (offset {} -> {})",
           id,
@@ -80,6 +80,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
               .findById(Integer.toString(partition))
               .orElse(new StatisticsDocument(partition));
       consumer.seek(tp, document.offset);
+      lastOffsets.put(partition, document.offset);
       seen.put(partition, document.statistics);
     });
   }