Bereits gesehene Nachrichten werden übersprungen
authorKai Moritz <kai@juplo.de>
Fri, 26 Aug 2022 08:25:55 +0000 (10:25 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 16 Sep 2022 08:40:47 +0000 (10:40 +0200)
* Der `ApplicationRecordHandler` merkt sich für jede Partition den Offset
  der zuletzt erfolgreich verarbeiteten Nachrichten.
* Diese Offsets werden in der Mongo-DB mitgespeichert und daraus wieder
  hergestellt.
* Wenn der Handler eine Nachricht erhält, deren Offset nicht größer als
  der gemerkte Offset ist, wird diese mit einer Warnung verworfen.
* Vorführ-Skript so verschärft, dass deutlich wird, dass der Zustand durch
  die Verbesserung auch bei außerordentlichen Fehlern korrekt erhalten
  bleibt.

README.sh
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
src/main/java/de/juplo/kafka/ApplicationState.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/StateDocument.java

index f337d5c..3292f5f 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -76,7 +76,7 @@ http :8092/results/klaus | jq .[].sum | uniq
 
 docker-compose stop adder-1
 until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done
-until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done
+until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-2..."; sleep 1; done
 
 echo "Resultate für adder-2"
 http -v --pretty none -S :8092/results
@@ -87,4 +87,25 @@ http :8092/results/peter | jq .[].sum | uniq
 echo "Resultate für klaus von adder-2"
 http :8092/results/klaus | jq .[].sum | uniq
 
+docker-compose kill -s 9 adder-2
+docker-compose start adder-1
 docker-compose kill -s 9 peter klaus
+while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
+until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done
+until [ $(http --check-status :8091/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done
+
+echo "Resultate für adder-1"
+http -v --pretty none -S :8091/results
+echo
+
+echo "Resultate für peter von adder-1"
+http :8091/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-1"
+http :8091/results/klaus | jq .[].sum | uniq
+
+sleep 5
+
+echo "Resultate für peter von adder-1"
+http :8091/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-1"
+http :8091/results/klaus | jq .[].sum | uniq
index 0bfee67..5e1a12c 100644 (file)
@@ -31,7 +31,12 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener
           stateRepository
               .findById(Integer.toString(partition))
               .orElse(new StateDocument(partition));
-      recordHandler.addPartition(partition, document.state);
+      log.info(
+        "{} - Offset of next unseen message for partition {}: {}",
+        id,
+        partition,
+        document.offset);
+      recordHandler.addPartition(partition, document.state, document.offset);
       for (String user : document.state.keySet())
       {
         log.info(
@@ -53,18 +58,28 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener
       Integer partition = tp.partition();
       log.info("{} - removing partition: {}", id, partition);
       this.partitions.remove(partition);
-      Map<String, AdderResult> state = recordHandler.removePartition(partition);
-      for (String user : state.keySet())
+      ApplicationState state = recordHandler.removePartition(partition);
+      log.info(
+          "{} - offset of next unseen message for partition {} is {}",
+          id,
+          partition,
+          state.getOffset());
+      for (String user : state.getAdderState().keySet())
       {
         log.info(
             "{} - Saved state for partition={}|user={}: {}",
             id,
             partition,
             user,
-            state.get(user));
+            state.getAdderState().get(user));
       }
       Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
-      stateRepository.save(new StateDocument(partition, state, results));
+      stateRepository.save(
+        new StateDocument(
+          partition,
+          state.getAdderState(),
+          results,
+          state.getOffset()));
     });
   }
 }
index 51d524f..ef38357 100644 (file)
@@ -19,6 +19,7 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
   private final String id;
 
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
+  private final Map<Integer, Long> next = new HashMap<>();
 
 
   @Override
@@ -28,6 +29,16 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
     String user = record.key();
     String message = record.value();
 
+    if (record.offset() < next.get(partition))
+    {
+      log.warn(
+        "{}- Dropping duplicate message: offset={} < next={}",
+        id,
+        record.offset(),
+        next.get(partition));
+      return;
+    }
+
     if (message.equals("CALCULATE"))
     {
       AdderResult result = state.get(partition).calculate(user);
@@ -39,6 +50,8 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
       state.get(partition).addToSum(user, Integer.parseInt(message));
     }
 
+    next.put(partition, record.offset() + 1);
+
     if (throttle.isPresent())
     {
       try
@@ -52,14 +65,22 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
     }
   }
 
-  protected void addPartition(Integer partition, Map<String, AdderResult> state)
+  protected void addPartition(Integer partition, Map<String, AdderResult> state, Long offset)
   {
     this.state.put(partition, new AdderBusinessLogic(state));
+    this.next.put(partition, offset);
   }
 
-  protected Map<String, AdderResult> removePartition(Integer partition)
+  protected ApplicationState removePartition(Integer partition)
   {
-    return this.state.remove(partition).getState();
+    ApplicationState state =
+      new ApplicationState(
+        this.next.get(partition),
+        this.state.remove(partition).getState());
+
+    this.next.remove(partition);
+
+    return state;
   }
 
 
diff --git a/src/main/java/de/juplo/kafka/ApplicationState.java b/src/main/java/de/juplo/kafka/ApplicationState.java
new file mode 100644 (file)
index 0000000..120e43a
--- /dev/null
@@ -0,0 +1,15 @@
+package de.juplo.kafka;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import java.util.Map;
+
+
+@RequiredArgsConstructor
+@Getter
+public class ApplicationState
+{
+  private final Long offset;
+  private final Map<String, AdderResult> adderState;
+}
index ae8eb51..5c4ca22 100644 (file)
@@ -15,6 +15,7 @@ public class StateDocument
 {
   @Id
   public String id;
+  public long offset = 0l;
   public Map<String, AdderResult> state;
   public Map<String, List<AdderResult>> results;
 
@@ -32,10 +33,12 @@ public class StateDocument
   public StateDocument(
       Integer partition,
       Map<String, AdderResult> state,
-      Map<String, List<AdderResult>> results)
+      Map<String, List<AdderResult>> results,
+      long offset)
   {
     this.id = Integer.toString(partition);
     this.state = state;
     this.results = results;
+    this.offset = offset;
   }
 }