Merge der Refaktorisierung des EndlessConsumer (Branch 'deserialization')
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index 8802df9..a21dd86 100644 (file)
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantLock;
 public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnable
 {
   private final ExecutorService executor;
+  private final PartitionStatisticsRepository repository;
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
@@ -62,6 +63,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
             partition,
             key);
       }
+      repository.save(new StatisticsDocument(partition, removed));
     });
   }
 
@@ -74,7 +76,12 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
       Long offset = consumer.position(tp);
       log.info("{} - adding partition: {}, offset={}", id, partition, offset);
       offsets.put(partition, offset);
-      seen.put(partition, new HashMap<>());
+      seen.put(
+          partition,
+          repository
+              .findById(Integer.toString(tp.partition()))
+              .map(document -> document.statistics)
+              .orElse(new HashMap<>()));
     });
   }