Fehler in Logging-Ausgabe korrigiert
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index 2a3445c..6e460b4 100644 (file)
@@ -35,7 +35,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   private long consumed = 0;
 
   private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
-  private final Map<Integer, Long> offsets = new HashMap<>();
+  private final Map<Integer, Long> lastOffsets = new HashMap<>();
 
 
   @Override
@@ -45,7 +45,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
     {
       Integer partition = tp.partition();
       Long newOffset = consumer.position(tp);
-      Long oldOffset = offsets.remove(partition);
+      Long oldOffset = lastOffsets.remove(partition);
       log.info(
           "{} - removing partition: {}, consumed {} records (offset {} -> {})",
           id,
@@ -80,6 +80,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
               .findById(Integer.toString(partition))
               .orElse(new StatisticsDocument(partition));
       consumer.seek(tp, document.offset);
+      lastOffsets.put(partition, document.offset);
       seen.put(partition, document.statistics);
     });
   }