Die Implementierung speichert Zustand & Offsets vor _jedem_ `poll()`
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
index 51d524f..bc18d59 100644 (file)
@@ -19,6 +19,7 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
   private final String id;
 
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
+  private final Map<Integer, Long> next = new HashMap<>();
 
 
   @Override
@@ -28,6 +29,16 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
     String user = record.key();
     String message = record.value();
 
+    if (record.offset() < next.get(partition))
+    {
+      log.warn(
+        "{}- Dropping duplicate message: offset={} < next={}",
+        id,
+        record.offset(),
+        next.get(partition));
+      return;
+    }
+
     if (message.equals("CALCULATE"))
     {
       AdderResult result = state.get(partition).calculate(user);
@@ -39,6 +50,8 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
       state.get(partition).addToSum(user, Integer.parseInt(message));
     }
 
+    next.put(partition, record.offset() + 1);
+
     if (throttle.isPresent())
     {
       try
@@ -52,14 +65,18 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
     }
   }
 
-  protected void addPartition(Integer partition, Map<String, AdderResult> state)
+  protected void addPartition(Integer partition, Map<String, AdderResult> state, Long offset)
   {
     this.state.put(partition, new AdderBusinessLogic(state));
+    this.next.put(partition, offset);
   }
 
-  protected Map<String, AdderResult> removePartition(Integer partition)
+  protected ApplicationState removePartition(Integer partition)
   {
-    return this.state.remove(partition).getState();
+    ApplicationState state = getState(partition);
+    this.next.remove(partition);
+    this.state.remove(partition);
+    return state;
   }
 
 
@@ -68,8 +85,11 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
     return state;
   }
 
-  public AdderBusinessLogic getState(Integer partition)
+  public ApplicationState getState(Integer partition)
   {
-    return state.get(partition);
+    return
+      new ApplicationState(
+        this.next.get(partition),
+        this.state.get(partition).getState());
   }
 }