Die Implementierung speichert Zustand & Offsets vor _jedem_ `poll()`
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
index ef38357..bc18d59 100644 (file)
@@ -73,13 +73,9 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
 
   protected ApplicationState removePartition(Integer partition)
   {
-    ApplicationState state =
-      new ApplicationState(
-        this.next.get(partition),
-        this.state.remove(partition).getState());
-
+    ApplicationState state = getState(partition);
     this.next.remove(partition);
-
+    this.state.remove(partition);
     return state;
   }
 
@@ -89,8 +85,11 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
     return state;
   }
 
-  public AdderBusinessLogic getState(Integer partition)
+  public ApplicationState getState(Integer partition)
   {
-    return state.get(partition);
+    return
+      new ApplicationState(
+        this.next.get(partition),
+        this.state.get(partition).getState());
   }
 }