Die Implementierung speichert Zustand & Offsets vor _jedem_ `poll()` sumup-adder--drop-duplicates sumup-adder--drop-duplicates---lvm-2-tage
authorKai Moritz <kai@juplo.de>
Fri, 2 Sep 2022 03:41:42 +0000 (05:41 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 16 Sep 2022 08:40:49 +0000 (10:40 +0200)
- Wenn Zustand & Offsets nur während eines Rebalances gespeichert werden,
  hilft das idempotente Verhalten der Implementierung bei einem Absturz
  nicht.
- Grund: Die automatischen Commits von Kafka werden nicht in den
  gespeicherten Offsets reflektiert.
- D.h., damit die Verbesserung wirkt, muss sicher gestellt sein, dass
  (mindestens) jedes mal, wenn ein Commit erfolgt, auch der Zustand und
  die Offsets in den Daten gespeichert werden.
- Da ein Commit jedes mal ausgelöst werden _kann_, wenn `poll()`
  aufgerufen wird, müssen der Zustand und die Offsets also vor jedem
  Aufruf von `poll()` persistiert werden.

README.sh
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/RebalanceListener.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index 3292f5f..1dde42d 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -19,7 +19,7 @@ if [[
 ]]
 then
   docker-compose rm -svf adder-1 adder-2
-  mvn -D skipTests clean install || exit
+  mvn clean install || exit
 else
   echo "Using image existing images:"
   docker image ls $IMAGE
index 5e1a12c..86fe68a 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.*;
@@ -10,7 +9,7 @@ import java.util.*;
 
 @RequiredArgsConstructor
 @Slf4j
-public class ApplicationRebalanceListener implements ConsumerRebalanceListener
+public class ApplicationRebalanceListener implements RebalanceListener
 {
   private final ApplicationRecordHandler recordHandler;
   private final AdderResults adderResults;
@@ -82,4 +81,37 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener
           state.getOffset()));
     });
   }
+
+  @Override
+  public void beforeNextPoll()
+  {
+    partitions
+      .stream()
+      .forEach(partition ->
+      {
+        log.info("{} - persisting state & offset for partition: {}", id, partition);
+        ApplicationState state = recordHandler.getState(partition);
+        log.info(
+          "{} - offset of next unseen message for partition {} is {}",
+          id,
+          partition,
+          state.getOffset());
+        for (String user : state.getAdderState().keySet())
+        {
+          log.info(
+            "{} - Saved state for partition={}|user={}: {}",
+            id,
+            partition,
+            user,
+            state.getAdderState().get(user));
+        }
+        Map<String, List<AdderResult>> results = adderResults.getState(partition);
+        stateRepository.save(
+          new StateDocument(
+            partition,
+            state.getAdderState(),
+            results,
+            state.getOffset()));
+      });
+  }
 }
index ef38357..bc18d59 100644 (file)
@@ -73,13 +73,9 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
 
   protected ApplicationState removePartition(Integer partition)
   {
-    ApplicationState state =
-      new ApplicationState(
-        this.next.get(partition),
-        this.state.remove(partition).getState());
-
+    ApplicationState state = getState(partition);
     this.next.remove(partition);
-
+    this.state.remove(partition);
     return state;
   }
 
@@ -89,8 +85,11 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
     return state;
   }
 
-  public AdderBusinessLogic getState(Integer partition)
+  public ApplicationState getState(Integer partition)
   {
-    return state.get(partition);
+    return
+      new ApplicationState(
+        this.next.get(partition),
+        this.state.get(partition).getState());
   }
 }
index 00678c4..92802b9 100644 (file)
@@ -25,7 +25,7 @@ public class EndlessConsumer<K, V> implements Runnable
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
-  private final ConsumerRebalanceListener rebalanceListener;
+  private final RebalanceListener rebalanceListener;
   private final RecordHandler<K, V> recordHandler;
 
   private final Lock lock = new ReentrantLock();
@@ -67,6 +67,8 @@ public class EndlessConsumer<K, V> implements Runnable
 
           consumed++;
         }
+
+        rebalanceListener.beforeNextPoll();
       }
     }
     catch(WakeupException e)
diff --git a/src/main/java/de/juplo/kafka/RebalanceListener.java b/src/main/java/de/juplo/kafka/RebalanceListener.java
new file mode 100644 (file)
index 0000000..3c1208f
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+
+public interface RebalanceListener extends ConsumerRebalanceListener
+{
+  default void beforeNextPoll() {}
+}
index 8849317..8124c81 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka;
 
 import com.mongodb.client.MongoClient;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -67,7 +66,7 @@ abstract class GenericApplicationTests<K, V>
        @Autowired
        MongoProperties mongoProperties;
        @Autowired
-       ConsumerRebalanceListener rebalanceListener;
+       RebalanceListener rebalanceListener;
        @Autowired
        RecordHandler<K, V> recordHandler;