From 9fe70ad96d2b5a9ed0581057da54facba859ad1f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 28 Aug 2022 16:01:22 +0200 Subject: [PATCH] =?utf8?q?R=C3=BCckbau=20auf=20einen=20Consumer,=20der=20i?= =?utf8?q?n=20`onPartitionsRevoked()`=20nicht=20committed?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Dadurch sollte es bei einem Rebalance i.d.R. zu Fehlern in dem mitgeführten Zustand kommen, da die Verarbeitung nur im Zufall an dem Offset fortegführt wird, für den der Zustand gespeichert wurde. * Um das vorherige Verhalten der Implementierung wiederherzustellen, müssen insbesondere die commits im Falle eines ordentlichen Herunterfahrens und eines Deserialisierungs-Fehlers wieder ergänzt werden. Denn ansonsten bestätigt die Implementierung die Offsets für die zuletzt erfolgreich verarbeiteten Nachrichten nicht. * Vorführ-Skript so angepasst, dass man sofort sieht, dass in dieser Version schon eine reguläre "Staffelübergabe" - also auch schon ein normales Rebalance, das einfach durch das Starten eines zweiten Consumers ausgelöst wurde - ein Fehler auftritt. --- README.sh | 34 ------------------- .../juplo/kafka/ApplicationConfiguration.java | 5 +-- .../kafka/ApplicationRebalanceListener.java | 12 ------- .../java/de/juplo/kafka/EndlessConsumer.java | 4 ++- 4 files changed, 4 insertions(+), 51 deletions(-) diff --git a/README.sh b/README.sh index 3292f5f..c9494b9 100755 --- a/README.sh +++ b/README.sh @@ -74,38 +74,4 @@ http :8092/results/peter | jq .[].sum | uniq echo "Resultate für klaus von adder-2" http :8092/results/klaus | jq .[].sum | uniq -docker-compose stop adder-1 -until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done -until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-2..."; sleep 1; done - -echo "Resultate für adder-2" -http -v --pretty none -S :8092/results -echo - -echo "Resultate für peter von adder-2" -http :8092/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-2" -http :8092/results/klaus | jq .[].sum | uniq - -docker-compose kill -s 9 adder-2 -docker-compose start adder-1 docker-compose kill -s 9 peter klaus -while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done -until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done -until [ $(http --check-status :8091/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done - -echo "Resultate für adder-1" -http -v --pretty none -S :8091/results -echo - -echo "Resultate für peter von adder-1" -http :8091/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-1" -http :8091/results/klaus | jq .[].sum | uniq - -sleep 5 - -echo "Resultate für peter von adder-1" -http :8091/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-1" -http :8091/results/klaus | jq .[].sum | uniq diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index e08cff4..1c69760 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,6 +1,5 @@ package de.juplo.kafka; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -39,15 +38,13 @@ public class ApplicationConfiguration ApplicationRecordHandler recordHandler, AdderResults adderResults, StateRepository stateRepository, - Consumer consumer, ApplicationProperties properties) { return new ApplicationRebalanceListener( recordHandler, adderResults, stateRepository, - properties.getClientId(), - consumer); + properties.getClientId()); } @Bean diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index eef0d00..0bfee67 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; @@ -17,7 +16,6 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener private final AdderResults adderResults; private final StateRepository stateRepository; private final String id; - private final Consumer consumer; private final Set partitions = new HashSet<>(); @@ -50,16 +48,6 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener @Override public void onPartitionsRevoked(Collection partitions) { - log.info("{} - Commiting offsets for all previously assigned partitions", id); - try - { - consumer.commitSync(); - } - catch (Exception e) - { - log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e); - } - partitions.forEach(tp -> { Integer partition = tp.partition(); diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index f0e74d3..00678c4 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -71,7 +71,8 @@ public class EndlessConsumer implements Runnable } catch(WakeupException e) { - log.info("{} - RIIING! Request to stop consumption.", id); + log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); + consumer.commitSync(); shutdown(); } catch(RecordDeserializationException e) @@ -85,6 +86,7 @@ public class EndlessConsumer implements Runnable offset, e.getCause().toString()); + consumer.commitSync(); shutdown(e); } catch(Exception e) -- 2.20.1