Ausgabe der verarbeiteten Nachrichten im Revoke-Callback entfernt
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index 6e460b4..7e243a9 100644 (file)
@@ -35,7 +35,6 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   private long consumed = 0;
 
   private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
-  private final Map<Integer, Long> lastOffsets = new HashMap<>();
 
 
   @Override
@@ -45,13 +44,10 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
     {
       Integer partition = tp.partition();
       Long newOffset = consumer.position(tp);
-      Long oldOffset = lastOffsets.remove(partition);
       log.info(
-          "{} - removing partition: {}, consumed {} records (offset {} -> {})",
+          "{} - removing partition: {}, offset of next message {})",
           id,
           partition,
-          newOffset - oldOffset,
-          oldOffset,
           newOffset);
       Map<String, Long> removed = seen.remove(partition);
       for (String key : removed.keySet())
@@ -80,7 +76,6 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
               .findById(Integer.toString(partition))
               .orElse(new StatisticsDocument(partition));
       consumer.seek(tp, document.offset);
-      lastOffsets.put(partition, document.offset);
       seen.put(partition, document.statistics);
     });
   }