Offset-Position in der MongoDB wird vor jedem `poll()` aktualisiert
authorKai Moritz <kai@juplo.de>
Wed, 6 Apr 2022 22:00:57 +0000 (00:00 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 6 Apr 2022 22:11:49 +0000 (00:11 +0200)
src/main/java/de/juplo/kafka/EndlessConsumer.java

index 69c8294..b152310 100644 (file)
@@ -135,6 +135,8 @@ public class EndlessConsumer implements Runnable
           String key = record.key() == null ? "NULL" : record.key();
           seen.get(partition).increment(key);
         }
+
+        seen.forEach((tp, statistics) -> repository.save(new StatisticsDocument(statistics, consumer.position(tp))));
       }
     }
     catch(WakeupException e)