Offset-Position wird in der MongoDB gespeichert
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index e67bf41..69c8294 100644 (file)
@@ -64,6 +64,7 @@ public class EndlessConsumer implements Runnable
       props.put("bootstrap.servers", bootstrapServer);
       props.put("group.id", groupId);
       props.put("client.id", id);
+      props.put("enable.auto.commit", false);
       props.put("auto.offset.reset", autoOffsetReset);
       props.put("metadata.max.age.ms", "1000");
       props.put("key.deserializer", StringDeserializer.class.getName());
@@ -90,7 +91,7 @@ public class EndlessConsumer implements Runnable
                   removed.getPartition(),
                   counter.getKey());
             }
-            repository.save(new StatisticsDocument(removed));
+            repository.save(new StatisticsDocument(removed, consumer.position(tp)));
           });
         }
 
@@ -100,12 +101,12 @@ public class EndlessConsumer implements Runnable
           partitions.forEach(tp ->
           {
             log.info("{} - adding partition: {}", id, tp);
-            seen.put(
-                tp,
+            StatisticsDocument document =
                 repository
                     .findById(tp.toString())
-                    .map(PartitionStatistics::new)
-                    .orElse(new PartitionStatistics(tp)));
+                    .orElse(new StatisticsDocument(tp));
+            consumer.seek(tp, document.offset);
+            seen.put(tp, new PartitionStatistics(document));
           });
         }
       });