Rückbau auf einen Consumer, der in `onPartitionsRevoked()` nicht committed sumup-adder--rebalance-listener
authorKai Moritz <kai@juplo.de>
Sun, 28 Aug 2022 14:01:22 +0000 (16:01 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 6 Sep 2022 17:14:52 +0000 (19:14 +0200)
* Dadurch sollte es bei einem Rebalance i.d.R. zu Fehlern in dem
  mitgeführten Zustand kommen, da die Verarbeitung nur im Zufall an dem
  Offset fortegführt wird, für den der Zustand gespeichert wurde.
* Um das vorherige Verhalten der Implementierung wiederherzustellen,
  müssen insbesondere die commits im Falle eines ordentlichen
  Herunterfahrens und eines Deserialisierungs-Fehlers wieder
  ergänzt werden. Denn ansonsten bestätigt die Implementierung die
  Offsets für die zuletzt erfolgreich verarbeiteten Nachrichten nicht.
* Vorführ-Skript so angepasst, dass man sofort sieht, dass in dieser
  Version schon eine reguläre "Staffelübergabe" - also auch schon ein
  normales Rebalance, das einfach durch das Starten eines zweiten
  Consumers ausgelöst wurde - ein Fehler auftritt.

README.sh
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
src/main/java/de/juplo/kafka/EndlessConsumer.java

index 3292f5f..c9494b9 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -74,38 +74,4 @@ http :8092/results/peter | jq .[].sum | uniq
 echo "Resultate für klaus von adder-2"
 http :8092/results/klaus | jq .[].sum | uniq
 
-docker-compose stop adder-1
-until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done
-until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-2..."; sleep 1; done
-
-echo "Resultate für adder-2"
-http -v --pretty none -S :8092/results
-echo
-
-echo "Resultate für peter von adder-2"
-http :8092/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-2"
-http :8092/results/klaus | jq .[].sum | uniq
-
-docker-compose kill -s 9 adder-2
-docker-compose start adder-1
 docker-compose kill -s 9 peter klaus
-while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
-until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done
-until [ $(http --check-status :8091/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done
-
-echo "Resultate für adder-1"
-http -v --pretty none -S :8091/results
-echo
-
-echo "Resultate für peter von adder-1"
-http :8091/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-1"
-http :8091/results/klaus | jq .[].sum | uniq
-
-sleep 5
-
-echo "Resultate für peter von adder-1"
-http :8091/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-1"
-http :8091/results/klaus | jq .[].sum | uniq
index e08cff4..1c69760 100644 (file)
@@ -1,6 +1,5 @@
 package de.juplo.kafka;
 
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -39,15 +38,13 @@ public class ApplicationConfiguration
       ApplicationRecordHandler recordHandler,
       AdderResults adderResults,
       StateRepository stateRepository,
-      Consumer<String, String> consumer,
       ApplicationProperties properties)
   {
     return new ApplicationRebalanceListener(
         recordHandler,
         adderResults,
         stateRepository,
-        properties.getClientId(),
-        consumer);
+        properties.getClientId());
   }
 
   @Bean
index eef0d00..0bfee67 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 
@@ -17,7 +16,6 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener
   private final AdderResults adderResults;
   private final StateRepository stateRepository;
   private final String id;
-  private final Consumer consumer;
 
   private final Set<Integer> partitions = new HashSet<>();
 
@@ -50,16 +48,6 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener
   @Override
   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
-    log.info("{} - Commiting offsets for all previously assigned partitions", id);
-    try
-    {
-      consumer.commitSync();
-    }
-    catch (Exception e)
-    {
-      log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e);
-    }
-
     partitions.forEach(tp ->
     {
       Integer partition = tp.partition();
index f0e74d3..00678c4 100644 (file)
@@ -71,7 +71,8 @@ public class EndlessConsumer<K, V> implements Runnable
     }
     catch(WakeupException e)
     {
-      log.info("{} - RIIING! Request to stop consumption.", id);
+      log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
+      consumer.commitSync();
       shutdown();
     }
     catch(RecordDeserializationException e)
@@ -85,6 +86,7 @@ public class EndlessConsumer<K, V> implements Runnable
           offset,
           e.getCause().toString());
 
+      consumer.commitSync();
       shutdown(e);
     }
     catch(Exception e)