]> juplo.de Git - demos/kafka/training/commitdiff
Rückbau auf einen Consumer, der in `onPartitionsRevoked()` nicht committed sumup-adder--rebalance-listener
authorKai Moritz <kai@juplo.de>
Sun, 28 Aug 2022 14:01:22 +0000 (16:01 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 6 Sep 2022 17:14:52 +0000 (19:14 +0200)
* Dadurch sollte es bei einem Rebalance i.d.R. zu Fehlern in dem
  mitgeführten Zustand kommen, da die Verarbeitung nur im Zufall an dem
  Offset fortegführt wird, für den der Zustand gespeichert wurde.
* Um das vorherige Verhalten der Implementierung wiederherzustellen,
  müssen insbesondere die commits im Falle eines ordentlichen
  Herunterfahrens und eines Deserialisierungs-Fehlers wieder
  ergänzt werden. Denn ansonsten bestätigt die Implementierung die
  Offsets für die zuletzt erfolgreich verarbeiteten Nachrichten nicht.
* Vorführ-Skript so angepasst, dass man sofort sieht, dass in dieser
  Version schon eine reguläre "Staffelübergabe" - also auch schon ein
  normales Rebalance, das einfach durch das Starten eines zweiten
  Consumers ausgelöst wurde - ein Fehler auftritt.

README.sh
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
src/main/java/de/juplo/kafka/EndlessConsumer.java

index 3292f5fbd019bf213aef2188643c8908ee887b82..c9494b9be839cd0c05bd726364483c9bcc16aff2 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -74,38 +74,4 @@ http :8092/results/peter | jq .[].sum | uniq
 echo "Resultate für klaus von adder-2"
 http :8092/results/klaus | jq .[].sum | uniq
 
-docker-compose stop adder-1
-until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done
-until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-2..."; sleep 1; done
-
-echo "Resultate für adder-2"
-http -v --pretty none -S :8092/results
-echo
-
-echo "Resultate für peter von adder-2"
-http :8092/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-2"
-http :8092/results/klaus | jq .[].sum | uniq
-
-docker-compose kill -s 9 adder-2
-docker-compose start adder-1
 docker-compose kill -s 9 peter klaus
-while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
-until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done
-until [ $(http --check-status :8091/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done
-
-echo "Resultate für adder-1"
-http -v --pretty none -S :8091/results
-echo
-
-echo "Resultate für peter von adder-1"
-http :8091/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-1"
-http :8091/results/klaus | jq .[].sum | uniq
-
-sleep 5
-
-echo "Resultate für peter von adder-1"
-http :8091/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-1"
-http :8091/results/klaus | jq .[].sum | uniq
index e08cff411c72f076d78e5442a40c0a6ee41e2fca..1c6976010e7a1080df53f4590c4249569c5c1a10 100644 (file)
@@ -1,6 +1,5 @@
 package de.juplo.kafka;
 
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -39,15 +38,13 @@ public class ApplicationConfiguration
       ApplicationRecordHandler recordHandler,
       AdderResults adderResults,
       StateRepository stateRepository,
-      Consumer<String, String> consumer,
       ApplicationProperties properties)
   {
     return new ApplicationRebalanceListener(
         recordHandler,
         adderResults,
         stateRepository,
-        properties.getClientId(),
-        consumer);
+        properties.getClientId());
   }
 
   @Bean
index eef0d00d73e07f8eda6053d3334498f439e6f7d7..0bfee676e7140ad6bb4c4aad87deca36a2369436 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 
@@ -17,7 +16,6 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener
   private final AdderResults adderResults;
   private final StateRepository stateRepository;
   private final String id;
-  private final Consumer consumer;
 
   private final Set<Integer> partitions = new HashSet<>();
 
@@ -50,16 +48,6 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener
   @Override
   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
-    log.info("{} - Commiting offsets for all previously assigned partitions", id);
-    try
-    {
-      consumer.commitSync();
-    }
-    catch (Exception e)
-    {
-      log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e);
-    }
-
     partitions.forEach(tp ->
     {
       Integer partition = tp.partition();
index f0e74d39fd1737d59fdaa22f98815baca4c61df3..00678c45cf344dd43e5e4d5b5bdcbea74bd1a75d 100644 (file)
@@ -71,7 +71,8 @@ public class EndlessConsumer<K, V> implements Runnable
     }
     catch(WakeupException e)
     {
-      log.info("{} - RIIING! Request to stop consumption.", id);
+      log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
+      consumer.commitSync();
       shutdown();
     }
     catch(RecordDeserializationException e)
@@ -85,6 +86,7 @@ public class EndlessConsumer<K, V> implements Runnable
           offset,
           e.getCause().toString());
 
+      consumer.commitSync();
       shutdown(e);
     }
     catch(Exception e)