Rebalance-Listener: Consumer zählt die gesehenen Schlüssel consumer/spring-consumer--rebalance-listener--generics4all consumer/spring-consumer--rebalance-listener--generics4some
authorKai Moritz <kai@juplo.de>
Sun, 27 Oct 2024 11:03:06 +0000 (12:03 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 21 Feb 2025 10:58:17 +0000 (11:58 +0100)
* Die Counter werden mit dem ``String``-Key indiziert
** Vorbereitung auf eine Poison-Pill Übung ist hier noch out-of-scope...
* GET-Endpoint zum Abfragen der Schlüssel-Zählungen ergänzt
** Zugriff auf die `counterMap` in `ExampleConsumer` synchronisiert.
** `CounterStateController` kopiert die Map, um mögliche konkurierende
   Zugriffe während des Erzeugens der Ausgabe zu vermeiden.
* Der Zustand des Zählers wird in einem compacted Topic abgelegt
** Der Consumer zählt, welche Nachrichten gesendet und welche bestätigt
   wurden.
** Über einen `Phaser` wird sichergestellt, dass alle Nachrichten von den
   zuständigen Brokern bestätigt wurden, bevor der nächste ``poll()``-Aufruf
   erfolgt.
* Der Value-Typ in dem Topic `state` ist jetzt auch vom Typ `String`
** Dadurch wird die Kontrolle der Ergebnisse einfacher, da alle Nachrichten
   auch einfach mit `kafkacat` gelesen werden können.
* Fix: Fehler müssen wie bestätigte Nachrichten behandelt werden
** Ohne explizite Fehlerbehandlung müssen auch die nicht bestätigten
   Nachrichten als `acked` gezählt werden.
** Ansonsten würde die Verarbeitung in einem ``poll()``-Durchlauf mit Fehler
   hängen bleiben, da niemals alles "gesehenen" Nachrichten auch als
   "bestätigt" gezählt würden.
** Dabei: Producer-Code an den aus `producer/spring-producer` angeglichen.
* Log-Meldungen zum Fortschritt beim Versenden des Zähler-Status ergänzt
* Log-Meldungen für das Senden des Zählerstands ergänzt
* Fix: Der Rebalance-Listener wurde nie registriert
* Fehler im Logging der aktiven Phase korrigiert und Meldungen verbessert
* Fix: Nachrichten wurden ggf. doppelt verarbeitet
** Wenn man in einer Schliefe die Nachrichten pro Partition separat
   verarbeitet...
** ...dann sollte man in jedem Schleifendurchlauf auch nur die Nachrichten
*   der gerade zu verarbeitenden Partition abrufen!
* Fix: `poll()` liefert nicht immer Nachrichten zu _allen_ Partitionen
** Ein Aufruf von `poll()` liefert _nicht unbedingt_ Nachrichten zu _jeder_
   Partition, die der Instanz gerade zugeteilt ist.
** Daher konnte es auftreten, dass eine Phase nie beendet wurde, wenn
   `poll()` nur Nachrichten zu einer Untermenge der aktiven Partitionen
   geliefert hat.
* Der Zustand wird aus dem ``state``-Topic wiederhergestellt
* Refactor: Logik für Counter in Klasse `CounterState` extrahiert
* Refactor: DRY für state-change zu ASSIGNED
* Refactor: DRY für state-change zu UNASSIGNED
* Refactor: Neue, klarere ``switch``-Syntax
* DRY für state-change zu RESTORING
* Refactor: Handling von pause/resume vollständig in State-Change-Methoden
* Fix & Refactor: Restore-Behandlung wurde _allen_ aktiven Partitionen zuteil
** Durch das vorausgehende Refactoring wurde deutlich, dass die Behandlung,
   die den _neu_ hinzugefügten Partitionen zugedacht war, allen in
   `assignedPartitions` vermerkten Partitionen wiederfahren ist.
** Dies ist für den aktuellen Entwicklungsstand ggf. egal, da der wegen dem
   Co-Partitioning (noch!) benötigte `RangeAssignor` eh _zuerst alle_
   Partitionen entzieht, bevor er _dann alle_ neu zuteilt.
** Da der Code aber auch mit dem neuen Consumer-Rebalance Protokoll
   funktionieren muss, wurde das Refactoring hier fortgeführt und so
   vollendet, dass nun _alle_ Aktionenen _nur noch_ von den Callbacks
   `onPartitionsAssigned()` und `onPartitionsRevoked()` ausgeht.
* Der Zählerzustand wird separat pro Partition verwaltet
** Dadurch ist es möglich, den Zustand für entzogene Partitionen zu löschen.
** D.h., bei der Ausgabe ist immer klar ersichtlich, über welchen Zustand
   die angefragte Instanz gerade verfügt.
* Refactor: Zustand muss `CounterState` vollständig übergeben werden
* Refactor: Enum `PartitionState` in `State` umbenannt
* Effekte des Log-Compaction in dem Topic `state` sichtbar gemacht
* Setup mit ein bischen mehr Dampf (`README.sh` angepasst)
* TX-kompatibler Weg zur Prüfung auf eine abgeschlossene Wiederherstellung
** Der bisher verwendete Vergleich der Offset-Positionen schlägt fehl, wenn
   die Implementierung um Transaktionen erweitert wird
** _Grund:_ Dann stimmt die Offset-Position nicht mehr überein, weil nach
   der letzten Zustands-Nachricht noch eine, von der Transaktion erzeugte,
   versteckte Nachricht folgt, die die Anwendung nie zu sehen bekommt!
* Mögliche Exception wegen konkurrierendem Zugriff auf Map verhindert
* Rückbau auf einfachen Consumer mit Statistiken zur Nachrichtenzählung

README.sh
docker/docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/CounterState.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/CounterStateController.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/test/java/de/juplo/kafka/ApplicationTests.java

index b46e235..bdefd2b 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/spring-consumer:1.1-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -10,7 +10,7 @@ then
 fi
 
 docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf consumer
+docker compose -f docker/docker-compose.yml rm -svf consumer-1 consumer-2
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
@@ -27,13 +27,13 @@ docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1
 
 
 docker compose -f docker/docker-compose.yml up -d producer
-docker compose -f docker/docker-compose.yml up -d consumer
+docker compose -f docker/docker-compose.yml up -d consumer-1
+sleep 10
+docker compose -f docker/docker-compose.yml exec cli http -v consumer-1:8881/
 
-sleep 5
-docker compose -f docker/docker-compose.yml stop consumer
+docker compose -f docker/docker-compose.yml up -d consumer-2
+sleep 10
+docker compose -f docker/docker-compose.yml exec cli http -v consumer-1:8881/
+docker compose -f docker/docker-compose.yml exec cli http -v consumer-2:8881/
 
-docker compose -f docker/docker-compose.yml start consumer
-sleep 5
-
-docker compose -f docker/docker-compose.yml stop producer consumer
-docker compose -f docker/docker-compose.yml logs consumer
+docker compose -f docker/docker-compose.yml stop producer consumer-1
index 4fa2ead..6d6225d 100644 (file)
@@ -142,28 +142,34 @@ services:
       juplo.client-id: producer
       juplo.producer.topic: test
       juplo.producer.linger-ms: 666
-      juplo.producer.throttle-ms: 100
+      juplo.producer.throttle-ms: 10
 
   consumer:
-    image: juplo/spring-consumer:1.1-SNAPSHOT
+    image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: consumer
       juplo.consumer.topic: test
+      juplo.producer.linger-ms: 1000
+      logging.level.de.juplo: TRACE
 
   peter:
-    image: juplo/spring-consumer:1.1-SNAPSHOT
+    image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: peter
       juplo.consumer.topic: test
+      juplo.producer.linger-ms: 1000
+      logging.level.de.juplo: TRACE
 
   ute:
-    image: juplo/spring-consumer:1.1-SNAPSHOT
+    image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: ute
       juplo.consumer.topic: test
+      juplo.producer.linger-ms: 1000
+      logging.level.de.juplo: TRACE
 
 volumes:
   zookeeper-data:
diff --git a/pom.xml b/pom.xml
index dd96d00..daea167 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
   <artifactId>spring-consumer</artifactId>
   <name>Spring Consumer</name>
   <description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
-  <version>1.1-SNAPSHOT</version>
+  <version>1.1-rebalance-listener-SNAPSHOT</version>
 
   <properties>
     <java.version>21</java.version>
diff --git a/src/main/java/de/juplo/kafka/CounterState.java b/src/main/java/de/juplo/kafka/CounterState.java
new file mode 100644 (file)
index 0000000..78df108
--- /dev/null
@@ -0,0 +1,24 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@RequiredArgsConstructor
+public class CounterState<K>
+{
+  private final Map<K, Long> counterState;
+
+
+  public synchronized Long addToCounter(K key)
+  {
+    return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1);
+  }
+
+  public synchronized Map<K, Long> getCounterState()
+  {
+    return new HashMap<>(counterState);
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/CounterStateController.java b/src/main/java/de/juplo/kafka/CounterStateController.java
new file mode 100644 (file)
index 0000000..494b510
--- /dev/null
@@ -0,0 +1,21 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.Map;
+
+
+@RestController
+@RequiredArgsConstructor
+public class CounterStateController<K>
+{
+  private final ExampleConsumer<K, ?> consumer;
+
+  @GetMapping
+  Map<Integer, Map<K, Long>> getAllCounters()
+  {
+    return consumer.getCounterState();
+  }
+}
index a6691c3..244ac82 100644 (file)
@@ -2,16 +2,18 @@ package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 
 import java.time.Duration;
-import java.util.Arrays;
+import java.util.*;
 
 
 @Slf4j
-public class ExampleConsumer<K, V> implements Runnable
+public class ExampleConsumer<K, V> implements Runnable, ConsumerRebalanceListener
 {
   private final String id;
   private final String topic;
@@ -20,6 +22,8 @@ public class ExampleConsumer<K, V> implements Runnable
   private final Runnable closeCallback;
 
   private volatile boolean running = false;
+  private final Set<TopicPartition> assignedPartitions = new HashSet<>();
+  private CounterState<K>[] counterState;
   private long consumed = 0;
 
 
@@ -45,8 +49,13 @@ public class ExampleConsumer<K, V> implements Runnable
   {
     try
     {
+      log.info("{} - Fetching PartitionInfo for topic {}", id, topic);
+      int numPartitions = consumer.partitionsFor(topic).size();
+      log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
+      counterState = new CounterState[numPartitions];
+
       log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic));
+      consumer.subscribe(Arrays.asList(topic), this);
       running = true;
 
       while (running)
@@ -93,6 +102,48 @@ public class ExampleConsumer<K, V> implements Runnable
   {
     consumed++;
     log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
+
+    Long counter = computeCount(partition, key);
+    log.info("{} - current value for counter {}: {}", id, key, counter);
+  }
+
+  private synchronized Long computeCount(int partition, K key)
+  {
+    return counterState[partition].addToCounter(key);
+  }
+
+  public Map<Integer, Map<K, Long>> getCounterState()
+  {
+    Map<Integer, Map<K, Long>> result = new HashMap<>(assignedPartitions.size());
+    assignedPartitions.forEach(tp -> result.put(tp.partition(), counterState[tp.partition()].getCounterState()));
+    return result;
+  }
+
+
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    partitions
+      .stream()
+      .filter(partition -> partition.topic().equals(topic))
+      .forEach(partition ->
+      {
+        assignedPartitions.add(partition);
+        counterState[partition.partition()] = new CounterState<>(new HashMap<>());
+      });
+  }
+
+  @Override
+  public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+    partitions
+      .stream()
+      .filter(partition -> partition.topic().equals(topic))
+      .forEach(partition ->
+      {
+        assignedPartitions.remove(partition);
+        counterState[partition.partition()] = null;
+      });
   }
 
 
index ae119bf..b427efd 100644 (file)
@@ -20,6 +20,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 @SpringBootTest(
   properties = {
     "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
+    "spring.kafka.consumer.auto-offset-reset=earliest",
     "juplo.consumer.topic=" + TOPIC })
 @AutoConfigureMockMvc
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)