From 1c97cf9d1a93d5cfb6f1123cb45f644238798608 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 2 Sep 2022 05:41:42 +0200 Subject: [PATCH] Die Implementierung speichert Zustand & Offsets vor _jedem_ `poll()` MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit - Wenn Zustand & Offsets nur während eines Rebalances gespeichert werden, hilft das idempotente Verhalten der Implementierung bei einem Absturz nicht. - Grund: Die automatischen Commits von Kafka werden nicht in den gespeicherten Offsets reflektiert. - D.h., damit die Verbesserung wirkt, muss sicher gestellt sein, dass (mindestens) jedes mal, wenn ein Commit erfolgt, auch der Zustand und die Offsets in den Daten gespeichert werden. - Da ein Commit jedes mal ausgelöst werden _kann_, wenn `poll()` aufgerufen wird, müssen der Zustand und die Offsets also vor jedem Aufruf von `poll()` persistiert werden. --- README.sh | 2 +- .../kafka/ApplicationRebalanceListener.java | 36 +++++++++++++++++-- .../juplo/kafka/ApplicationRecordHandler.java | 15 ++++---- .../java/de/juplo/kafka/EndlessConsumer.java | 4 ++- .../de/juplo/kafka/RebalanceListener.java | 9 +++++ .../juplo/kafka/GenericApplicationTests.java | 3 +- 6 files changed, 55 insertions(+), 14 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/RebalanceListener.java diff --git a/README.sh b/README.sh index 3292f5f..1dde42d 100755 --- a/README.sh +++ b/README.sh @@ -19,7 +19,7 @@ if [[ ]] then docker-compose rm -svf adder-1 adder-2 - mvn -D skipTests clean install || exit + mvn clean install || exit else echo "Using image existing images:" docker image ls $IMAGE diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 5e1a12c..86fe68a 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; import java.util.*; @@ -10,7 +9,7 @@ import java.util.*; @RequiredArgsConstructor @Slf4j -public class ApplicationRebalanceListener implements ConsumerRebalanceListener +public class ApplicationRebalanceListener implements RebalanceListener { private final ApplicationRecordHandler recordHandler; private final AdderResults adderResults; @@ -82,4 +81,37 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener state.getOffset())); }); } + + @Override + public void beforeNextPoll() + { + partitions + .stream() + .forEach(partition -> + { + log.info("{} - persisting state & offset for partition: {}", id, partition); + ApplicationState state = recordHandler.getState(partition); + log.info( + "{} - offset of next unseen message for partition {} is {}", + id, + partition, + state.getOffset()); + for (String user : state.getAdderState().keySet()) + { + log.info( + "{} - Saved state for partition={}|user={}: {}", + id, + partition, + user, + state.getAdderState().get(user)); + } + Map> results = adderResults.getState(partition); + stateRepository.save( + new StateDocument( + partition, + state.getAdderState(), + results, + state.getOffset())); + }); + } } diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index ef38357..bc18d59 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -73,13 +73,9 @@ public class ApplicationRecordHandler implements RecordHandler protected ApplicationState removePartition(Integer partition) { - ApplicationState state = - new ApplicationState( - this.next.get(partition), - this.state.remove(partition).getState()); - + ApplicationState state = getState(partition); this.next.remove(partition); - + this.state.remove(partition); return state; } @@ -89,8 +85,11 @@ public class ApplicationRecordHandler implements RecordHandler return state; } - public AdderBusinessLogic getState(Integer partition) + public ApplicationState getState(Integer partition) { - return state.get(partition); + return + new ApplicationState( + this.next.get(partition), + this.state.get(partition).getState()); } } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 00678c4..92802b9 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,7 +25,7 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; - private final ConsumerRebalanceListener rebalanceListener; + private final RebalanceListener rebalanceListener; private final RecordHandler recordHandler; private final Lock lock = new ReentrantLock(); @@ -67,6 +67,8 @@ public class EndlessConsumer implements Runnable consumed++; } + + rebalanceListener.beforeNextPoll(); } } catch(WakeupException e) diff --git a/src/main/java/de/juplo/kafka/RebalanceListener.java b/src/main/java/de/juplo/kafka/RebalanceListener.java new file mode 100644 index 0000000..3c1208f --- /dev/null +++ b/src/main/java/de/juplo/kafka/RebalanceListener.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; + + +public interface RebalanceListener extends ConsumerRebalanceListener +{ + default void beforeNextPoll() {} +} diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 8849317..8124c81 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -67,7 +66,7 @@ abstract class GenericApplicationTests @Autowired MongoProperties mongoProperties; @Autowired - ConsumerRebalanceListener rebalanceListener; + RebalanceListener rebalanceListener; @Autowired RecordHandler recordHandler; -- 2.20.1