Bereits gesehene Nachrichten werden übersprungen
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRebalanceListener.java
index 6d3850f..5e1a12c 100644 (file)
@@ -5,9 +5,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
 import java.util.*;
 
 
@@ -34,7 +31,12 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener
           stateRepository
               .findById(Integer.toString(partition))
               .orElse(new StateDocument(partition));
-      recordHandler.addPartition(partition, document.state);
+      log.info(
+        "{} - Offset of next unseen message for partition {}: {}",
+        id,
+        partition,
+        document.offset);
+      recordHandler.addPartition(partition, document.state, document.offset);
       for (String user : document.state.keySet())
       {
         log.info(
@@ -56,18 +58,28 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener
       Integer partition = tp.partition();
       log.info("{} - removing partition: {}", id, partition);
       this.partitions.remove(partition);
-      Map<String, AdderResult> state = recordHandler.removePartition(partition);
-      for (String user : state.keySet())
+      ApplicationState state = recordHandler.removePartition(partition);
+      log.info(
+          "{} - offset of next unseen message for partition {} is {}",
+          id,
+          partition,
+          state.getOffset());
+      for (String user : state.getAdderState().keySet())
       {
         log.info(
             "{} - Saved state for partition={}|user={}: {}",
             id,
             partition,
             user,
-            state.get(user));
+            state.getAdderState().get(user));
       }
       Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
-      stateRepository.save(new StateDocument(partition, state, results));
+      stateRepository.save(
+        new StateDocument(
+          partition,
+          state.getAdderState(),
+          results,
+          state.getOffset()));
     });
   }
 }