Offset-Position in der MongoDB wird vor jedem `poll()` aktualisiert
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index 69c8294..b152310 100644 (file)
@@ -135,6 +135,8 @@ public class EndlessConsumer implements Runnable
           String key = record.key() == null ? "NULL" : record.key();
           seen.get(partition).increment(key);
         }
+
+        seen.forEach((tp, statistics) -> repository.save(new StatisticsDocument(statistics, consumer.position(tp))));
       }
     }
     catch(WakeupException e)