From 5805651c16e07a0710b88c2822894941f67c313e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 28 Aug 2022 15:59:54 +0200 Subject: [PATCH] =?utf8?q?R=C3=BCckbau=20auf=20einen=20Consumer,=20der=20i?= =?utf8?q?n=20`onPartitionsRevoked()`=20immer=20committed?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Entfernt wird hier das erweiterte Interface, für den Rebalance-Listener über den die Consumer-Implementierung die Commits für den Fehlerfall explizit deaktivieren kann. * Die Staffelübergabe sollte damit weiterhin normal funktionieren. D.h., solange der Consumer ordentlich heruntergefahren wird und nicht ein besonders hohes Nachrichten-Aufkommen angelegt wird. * Vorführ-Skript so angepasst, dass deutlich wird, dass die "Staffelübergabe" nun funktioniert, wenn Consumer ordentlich gestopped werden, aber weiterhin Fehler auftreten, wenn ein Consumer außerordentlich beendet (hier: getötet) wird. --- README.sh | 67 ++++++++----------- .../kafka/ApplicationRebalanceListener.java | 34 +++------- .../java/de/juplo/kafka/EndlessConsumer.java | 6 +- .../de/juplo/kafka/RebalanceListener.java | 10 --- .../juplo/kafka/GenericApplicationTests.java | 3 +- 5 files changed, 41 insertions(+), 79 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/RebalanceListener.java diff --git a/README.sh b/README.sh index 6be4b11..81213c1 100755 --- a/README.sh +++ b/README.sh @@ -28,89 +28,78 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 docker-compose up setup -docker-compose up -d gateway requests-1 requests-2 adder-1 adder-2 +docker-compose up -d gateway requests-1 requests-2 while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for gateway..."; sleep 1; done while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests-1..."; sleep 1; done while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests-2..."; sleep 1; done -while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done -while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-2..."; sleep 1; done docker-compose up -d peter klaus +docker-compose up -d adder-1 +while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done while [[ "$(http :8091/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-1..."; sleep 1; done http -v --pretty none -S :8091/results echo -while [[ "$(http :8092/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-2..."; sleep 1; done -http -v --pretty none -S :8092/results -echo + sleep 3 echo "Resultate für adder-1" http -v --pretty none -S :8091/results echo + echo "Resultate für peter von adder-1" http :8091/results/peter | jq .[].sum | uniq echo "Resultate für klaus von adder-1" http :8091/results/klaus | jq .[].sum | uniq -echo "Resultate für adder-2" + + +docker-compose up -d adder-2 +while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-2..."; sleep 1; done +while [[ "$(http :8092/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-2..."; sleep 1; done http -v --pretty none -S :8092/results echo -echo "Resultate für peter von adder-2" -http :8092/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-2" -http :8092/results/klaus | jq .[].sum | uniq -docker-compose stop adder-1 -sleep 1 +sleep 3 echo "Resultate für adder-2" http -v --pretty none -S :8092/results echo + +echo "Resultate für peter von adder-1" +http :8091/results/peter | jq .[].sum | uniq +echo "Resultate für klaus von adder-1" +http :8091/results/klaus | jq .[].sum | uniq + echo "Resultate für peter von adder-2" http :8092/results/peter | jq .[].sum | uniq echo "Resultate für klaus von adder-2" http :8092/results/klaus | jq .[].sum | uniq docker-compose stop adder-2 -docker-compose start adder-1 -while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done -while [[ "$(http :8091/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-1..."; sleep 1; done +while [[ "$(http :8091/results | jq -r '.[]|contains({peter})' | grep true)" != "true" ]]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done +while [[ "$(http :8091/results | jq -r '.[]|contains({klaus})' | grep true)" != "true" ]]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done + echo "Resultate für adder-1" http -v --pretty none -S :8091/results echo + echo "Resultate für peter von adder-1" http :8091/results/peter | jq .[].sum | uniq echo "Resultate für klaus von adder-1" http :8091/results/klaus | jq .[].sum | uniq -docker-compose start adder-2 -while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-2..."; sleep 1; done -while [[ "$(http :8092/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-2..."; sleep 1; done +docker-compose kill -s 9 adder-1 +docker-compose start adder-1 +while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done +while [[ "$(http :8091/results | jq -r '.[]|contains({peter})' | grep true)" != "true" ]]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done +while [[ "$(http :8091/results | jq -r '.[]|contains({klaus})' | grep true)" != "true" ]]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done + echo "Resultate für adder-1" http -v --pretty none -S :8091/results echo -echo "Resultate für adder-2" -http -v --pretty none -S :8092/results -echo + echo "Resultate für peter von adder-1" http :8091/results/peter | jq .[].sum | uniq echo "Resultate für klaus von adder-1" http :8091/results/klaus | jq .[].sum | uniq -echo "Resultate für peter von adder-2" -http :8092/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-2" -http :8092/results/klaus | jq .[].sum | uniq - -docker-compose kill -s 9 adder-1 -docker-compose start adder-1 -while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done -while [[ "$(http :8091/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-1..."; sleep 1; done docker-compose kill -s 9 peter klaus -echo "Resultate für peter von adder-1" -http :8091/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-1" -http :8091/results/klaus | jq .[].sum | uniq -echo "Resultate für peter von adder-2" -http :8092/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-2" -http :8092/results/klaus | jq .[].sum | uniq diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 63d57df..eef0d00 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -3,6 +3,7 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; import java.util.*; @@ -10,7 +11,7 @@ import java.util.*; @RequiredArgsConstructor @Slf4j -public class ApplicationRebalanceListener implements RebalanceListener +public class ApplicationRebalanceListener implements ConsumerRebalanceListener { private final ApplicationRecordHandler recordHandler; private final AdderResults adderResults; @@ -20,8 +21,6 @@ public class ApplicationRebalanceListener implements RebalanceListener private final Set partitions = new HashSet<>(); - private boolean commitsEnabled = true; - @Override public void onPartitionsAssigned(Collection partitions) { @@ -51,17 +50,14 @@ public class ApplicationRebalanceListener implements RebalanceListener @Override public void onPartitionsRevoked(Collection partitions) { - if (commitsEnabled) + log.info("{} - Commiting offsets for all previously assigned partitions", id); + try { - log.info("{} - Commiting offsets for all previously assigned partitions", id); - try - { - consumer.commitSync(); - } - catch (Exception e) - { - log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e); - } + consumer.commitSync(); + } + catch (Exception e) + { + log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e); } partitions.forEach(tp -> @@ -83,16 +79,4 @@ public class ApplicationRebalanceListener implements RebalanceListener stateRepository.save(new StateDocument(partition, state, results)); }); } - - @Override - public void enableCommits() - { - commitsEnabled = true; - } - - @Override - public void disableCommits() - { - commitsEnabled = false; - } } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 3ff479c..f0e74d3 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,7 +25,7 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; - private final RebalanceListener rebalanceListener; + private final ConsumerRebalanceListener rebalanceListener; private final RecordHandler recordHandler; private final Lock lock = new ReentrantLock(); @@ -42,7 +42,6 @@ public class EndlessConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - rebalanceListener.enableCommits(); consumer.subscribe(Arrays.asList(topic), rebalanceListener); while (true) @@ -90,8 +89,7 @@ public class EndlessConsumer implements Runnable } catch(Exception e) { - log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e); - rebalanceListener.disableCommits(); + log.error("{} - Unexpected error: {}", id, e.toString(), e); shutdown(e); } finally diff --git a/src/main/java/de/juplo/kafka/RebalanceListener.java b/src/main/java/de/juplo/kafka/RebalanceListener.java deleted file mode 100644 index 26f97aa..0000000 --- a/src/main/java/de/juplo/kafka/RebalanceListener.java +++ /dev/null @@ -1,10 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; - - -public interface RebalanceListener extends ConsumerRebalanceListener -{ - void enableCommits(); - void disableCommits(); -} diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 8124c81..8849317 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -66,7 +67,7 @@ abstract class GenericApplicationTests @Autowired MongoProperties mongoProperties; @Autowired - RebalanceListener rebalanceListener; + ConsumerRebalanceListener rebalanceListener; @Autowired RecordHandler recordHandler; -- 2.20.1