Wechsel auf den `StickyAssignor` löst die Rebalance-Fehler
authorKai Moritz <kai@juplo.de>
Tue, 30 Aug 2022 04:32:12 +0000 (06:32 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 6 Sep 2022 17:16:39 +0000 (19:16 +0200)
* Die durch Rebalances ausgelösten Zustand-Fehler bei regulären
  "Staffelübergaben" lassen sich vollständig durch ein Downgrade des
  `CooperativeStickyAssignor` auf den `StickyAssignor` lösen.
* *Achtung:* Der `StickyAssignor` verwendet das Eager-Protokoll.
* D.h., ein Down-Grade auf den `StickyAssignor` benötigt einen Reset
  der Gruppe, ist also nicht per Rolling Upgrade im laufenden Betrieb
  möglich.
* Vorführ-Skript so angepasst, dass man sofort sieht, dass diese
  Version alle regulären Rebalance-Fälle ohne Fehler durchführen kann.

README.sh
src/main/java/de/juplo/kafka/ApplicationConfiguration.java

index c9494b9..f337d5c 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -74,4 +74,17 @@ http :8092/results/peter | jq .[].sum | uniq
 echo "Resultate für klaus von adder-2"
 http :8092/results/klaus | jq .[].sum | uniq
 
+docker-compose stop adder-1
+until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done
+until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done
+
+echo "Resultate für adder-2"
+http -v --pretty none -S :8092/results
+echo
+
+echo "Resultate für peter von adder-2"
+http :8092/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-2"
+http :8092/results/klaus | jq .[].sum | uniq
+
 docker-compose kill -s 9 peter klaus
index 1c69760..624a4ec 100644 (file)
@@ -77,7 +77,7 @@ public class ApplicationConfiguration
     Properties props = new Properties();
 
     props.put("bootstrap.servers", properties.getBootstrapServer());
-    props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
+    props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.StickyAssignor");
     props.put("group.id", properties.getGroupId());
     props.put("client.id", properties.getClientId());
     props.put("auto.offset.reset", properties.getAutoOffsetReset());