WIP
authorKai Moritz <kai@juplo.de>
Fri, 16 Sep 2022 17:21:30 +0000 (19:21 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 16 Sep 2022 17:21:30 +0000 (19:21 +0200)
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
src/main/java/de/juplo/kafka/StateDocument.java

index ba15227..cd36959 100644 (file)
@@ -17,6 +17,8 @@ public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListe
   private final AdderResults adderResults;
   private final StateRepository stateRepository;
   private final String id;
+  private final String topic;
+  private final Consumer consumer;
 
   private final Set<Integer> partitions = new HashSet<>();
 
@@ -32,6 +34,12 @@ public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListe
           stateRepository
               .findById(Integer.toString(partition))
               .orElse(new StateDocument(partition));
+      log.info(
+        "{} - Offset of next unseen message for partition {}: {}",
+        id,
+        partition,
+        document.offset);
+      consumer.seek(tp, document.offset);
       recordHandler.addPartition(partition, document.state);
       for (String user : document.state.keySet())
       {
@@ -55,6 +63,12 @@ public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListe
       log.info("{} - removing partition: {}", id, partition);
       this.partitions.remove(partition);
       Map<String, AdderResult> state = recordHandler.removePartition(partition);
+      Long offset = consumer.position(tp);
+      log.info(
+          "{} - offset of next unseen message for partition {} is {}",
+          id,
+          partition,
+          offset);
       for (String user : state.keySet())
       {
         log.info(
@@ -65,7 +79,7 @@ public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListe
             state.get(user));
       }
       Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
-      stateRepository.save(new StateDocument(partition, state, results));
+      stateRepository.save(new StateDocument(partition, state, results, offset));
     });
   }
 }
index ae8eb51..5c4ca22 100644 (file)
@@ -15,6 +15,7 @@ public class StateDocument
 {
   @Id
   public String id;
+  public long offset = 0l;
   public Map<String, AdderResult> state;
   public Map<String, List<AdderResult>> results;
 
@@ -32,10 +33,12 @@ public class StateDocument
   public StateDocument(
       Integer partition,
       Map<String, AdderResult> state,
-      Map<String, List<AdderResult>> results)
+      Map<String, List<AdderResult>> results,
+      long offset)
   {
     this.id = Integer.toString(partition);
     this.state = state;
     this.results = results;
+    this.offset = offset;
   }
 }