From efb2f90f62b1e5bcf726bb7015d7d778e30ef368 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 27 Oct 2024 12:03:06 +0100 Subject: [PATCH] =?utf8?q?Rebalance-Listener:=20Consumer=20z=C3=A4hlt=20di?= =?utf8?q?e=20gesehenen=20Schl=C3=BCssel?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Die Counter werden mit dem ``String``-Key indiziert ** Vorbereitung auf eine Poison-Pill Übung ist hier noch out-of-scope... * GET-Endpoint zum Abfragen der Schlüssel-Zählungen ergänzt ** Zugriff auf die `counterMap` in `ExampleConsumer` synchronisiert. ** `CounterStateController` kopiert die Map, um mögliche konkurierende Zugriffe während des Erzeugens der Ausgabe zu vermeiden. * Der Zustand des Zählers wird in einem compacted Topic abgelegt ** Der Consumer zählt, welche Nachrichten gesendet und welche bestätigt wurden. ** Über einen `Phaser` wird sichergestellt, dass alle Nachrichten von den zuständigen Brokern bestätigt wurden, bevor der nächste ``poll()``-Aufruf erfolgt. * Der Value-Typ in dem Topic `state` ist jetzt auch vom Typ `String` ** Dadurch wird die Kontrolle der Ergebnisse einfacher, da alle Nachrichten auch einfach mit `kafkacat` gelesen werden können. * Fix: Fehler müssen wie bestätigte Nachrichten behandelt werden ** Ohne explizite Fehlerbehandlung müssen auch die nicht bestätigten Nachrichten als `acked` gezählt werden. ** Ansonsten würde die Verarbeitung in einem ``poll()``-Durchlauf mit Fehler hängen bleiben, da niemals alles "gesehenen" Nachrichten auch als "bestätigt" gezählt würden. ** Dabei: Producer-Code an den aus `producer/spring-producer` angeglichen. * Log-Meldungen zum Fortschritt beim Versenden des Zähler-Status ergänzt * Log-Meldungen für das Senden des Zählerstands ergänzt * Fix: Der Rebalance-Listener wurde nie registriert * Fehler im Logging der aktiven Phase korrigiert und Meldungen verbessert * Fix: Nachrichten wurden ggf. doppelt verarbeitet ** Wenn man in einer Schliefe die Nachrichten pro Partition separat verarbeitet... ** ...dann sollte man in jedem Schleifendurchlauf auch nur die Nachrichten * der gerade zu verarbeitenden Partition abrufen! * Fix: `poll()` liefert nicht immer Nachrichten zu _allen_ Partitionen ** Ein Aufruf von `poll()` liefert _nicht unbedingt_ Nachrichten zu _jeder_ Partition, die der Instanz gerade zugeteilt ist. ** Daher konnte es auftreten, dass eine Phase nie beendet wurde, wenn `poll()` nur Nachrichten zu einer Untermenge der aktiven Partitionen geliefert hat. * Der Zustand wird aus dem ``state``-Topic wiederhergestellt * Refactor: Logik für Counter in Klasse `CounterState` extrahiert * Refactor: DRY für state-change zu ASSIGNED * Refactor: DRY für state-change zu UNASSIGNED * Refactor: Neue, klarere ``switch``-Syntax * DRY für state-change zu RESTORING * Refactor: Handling von pause/resume vollständig in State-Change-Methoden * Fix & Refactor: Restore-Behandlung wurde _allen_ aktiven Partitionen zuteil ** Durch das vorausgehende Refactoring wurde deutlich, dass die Behandlung, die den _neu_ hinzugefügten Partitionen zugedacht war, allen in `assignedPartitions` vermerkten Partitionen wiederfahren ist. ** Dies ist für den aktuellen Entwicklungsstand ggf. egal, da der wegen dem Co-Partitioning (noch!) benötigte `RangeAssignor` eh _zuerst alle_ Partitionen entzieht, bevor er _dann alle_ neu zuteilt. ** Da der Code aber auch mit dem neuen Consumer-Rebalance Protokoll funktionieren muss, wurde das Refactoring hier fortgeführt und so vollendet, dass nun _alle_ Aktionenen _nur noch_ von den Callbacks `onPartitionsAssigned()` und `onPartitionsRevoked()` ausgeht. * Der Zählerzustand wird separat pro Partition verwaltet ** Dadurch ist es möglich, den Zustand für entzogene Partitionen zu löschen. ** D.h., bei der Ausgabe ist immer klar ersichtlich, über welchen Zustand die angefragte Instanz gerade verfügt. * Refactor: Zustand muss `CounterState` vollständig übergeben werden * Refactor: Enum `PartitionState` in `State` umbenannt * Effekte des Log-Compaction in dem Topic `state` sichtbar gemacht * Setup mit ein bischen mehr Dampf (`README.sh` angepasst) * TX-kompatibler Weg zur Prüfung auf eine abgeschlossene Wiederherstellung ** Der bisher verwendete Vergleich der Offset-Positionen schlägt fehl, wenn die Implementierung um Transaktionen erweitert wird ** _Grund:_ Dann stimmt die Offset-Position nicht mehr überein, weil nach der letzten Zustands-Nachricht noch eine, von der Transaktion erzeugte, versteckte Nachricht folgt, die die Anwendung nie zu sehen bekommt! * Mögliche Exception wegen konkurrierendem Zugriff auf Map verhindert * Rückbau auf einfachen Consumer mit Statistiken zur Nachrichtenzählung --- README.sh | 20 +++---- docker/docker-compose.yml | 14 +++-- pom.xml | 2 +- .../java/de/juplo/kafka/CounterState.java | 24 ++++++++ .../juplo/kafka/CounterStateController.java | 21 +++++++ .../java/de/juplo/kafka/ExampleConsumer.java | 57 ++++++++++++++++++- .../java/de/juplo/kafka/ApplicationTests.java | 1 + 7 files changed, 121 insertions(+), 18 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/CounterState.java create mode 100644 src/main/java/de/juplo/kafka/CounterStateController.java diff --git a/README.sh b/README.sh index b46e2350..bdefd2bf 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -10,7 +10,7 @@ then fi docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf consumer +docker compose -f docker/docker-compose.yml rm -svf consumer-1 consumer-2 if [[ $(docker image ls -q $IMAGE) == "" || @@ -27,13 +27,13 @@ docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 docker compose -f docker/docker-compose.yml up -d producer -docker compose -f docker/docker-compose.yml up -d consumer +docker compose -f docker/docker-compose.yml up -d consumer-1 +sleep 10 +docker compose -f docker/docker-compose.yml exec cli http -v consumer-1:8881/ -sleep 5 -docker compose -f docker/docker-compose.yml stop consumer +docker compose -f docker/docker-compose.yml up -d consumer-2 +sleep 10 +docker compose -f docker/docker-compose.yml exec cli http -v consumer-1:8881/ +docker compose -f docker/docker-compose.yml exec cli http -v consumer-2:8881/ -docker compose -f docker/docker-compose.yml start consumer -sleep 5 - -docker compose -f docker/docker-compose.yml stop producer consumer -docker compose -f docker/docker-compose.yml logs consumer +docker compose -f docker/docker-compose.yml stop producer consumer-1 diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 4fa2eade..6d6225db 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -142,28 +142,34 @@ services: juplo.client-id: producer juplo.producer.topic: test juplo.producer.linger-ms: 666 - juplo.producer.throttle-ms: 100 + juplo.producer.throttle-ms: 10 consumer: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: consumer juplo.consumer.topic: test + juplo.producer.linger-ms: 1000 + logging.level.de.juplo: TRACE peter: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: peter juplo.consumer.topic: test + juplo.producer.linger-ms: 1000 + logging.level.de.juplo: TRACE ute: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: ute juplo.consumer.topic: test + juplo.producer.linger-ms: 1000 + logging.level.de.juplo: TRACE volumes: zookeeper-data: diff --git a/pom.xml b/pom.xml index dd96d00f..daea1679 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-SNAPSHOT + 1.1-rebalance-listener-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/CounterState.java b/src/main/java/de/juplo/kafka/CounterState.java new file mode 100644 index 00000000..78df1087 --- /dev/null +++ b/src/main/java/de/juplo/kafka/CounterState.java @@ -0,0 +1,24 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; + +import java.util.HashMap; +import java.util.Map; + + +@RequiredArgsConstructor +public class CounterState +{ + private final Map counterState; + + + public synchronized Long addToCounter(K key) + { + return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1); + } + + public synchronized Map getCounterState() + { + return new HashMap<>(counterState); + } +} diff --git a/src/main/java/de/juplo/kafka/CounterStateController.java b/src/main/java/de/juplo/kafka/CounterStateController.java new file mode 100644 index 00000000..494b510d --- /dev/null +++ b/src/main/java/de/juplo/kafka/CounterStateController.java @@ -0,0 +1,21 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Map; + + +@RestController +@RequiredArgsConstructor +public class CounterStateController +{ + private final ExampleConsumer consumer; + + @GetMapping + Map> getAllCounters() + { + return consumer.getCounterState(); + } +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index a6691c3b..244ac82c 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -2,16 +2,18 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; -import java.util.Arrays; +import java.util.*; @Slf4j -public class ExampleConsumer implements Runnable +public class ExampleConsumer implements Runnable, ConsumerRebalanceListener { private final String id; private final String topic; @@ -20,6 +22,8 @@ public class ExampleConsumer implements Runnable private final Runnable closeCallback; private volatile boolean running = false; + private final Set assignedPartitions = new HashSet<>(); + private CounterState[] counterState; private long consumed = 0; @@ -45,8 +49,13 @@ public class ExampleConsumer implements Runnable { try { + log.info("{} - Fetching PartitionInfo for topic {}", id, topic); + int numPartitions = consumer.partitionsFor(topic).size(); + log.info("{} - Topic {} has {} partitions", id, topic, numPartitions); + counterState = new CounterState[numPartitions]; + log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + consumer.subscribe(Arrays.asList(topic), this); running = true; while (running) @@ -93,6 +102,48 @@ public class ExampleConsumer implements Runnable { consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); + + Long counter = computeCount(partition, key); + log.info("{} - current value for counter {}: {}", id, key, counter); + } + + private synchronized Long computeCount(int partition, K key) + { + return counterState[partition].addToCounter(key); + } + + public Map> getCounterState() + { + Map> result = new HashMap<>(assignedPartitions.size()); + assignedPartitions.forEach(tp -> result.put(tp.partition(), counterState[tp.partition()].getCounterState())); + return result; + } + + + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions + .stream() + .filter(partition -> partition.topic().equals(topic)) + .forEach(partition -> + { + assignedPartitions.add(partition); + counterState[partition.partition()] = new CounterState<>(new HashMap<>()); + }); + } + + @Override + public synchronized void onPartitionsRevoked(Collection partitions) + { + partitions + .stream() + .filter(partition -> partition.topic().equals(topic)) + .forEach(partition -> + { + assignedPartitions.remove(partition); + counterState[partition.partition()] = null; + }); } diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index ae119bff..b427efd1 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -20,6 +20,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", + "spring.kafka.consumer.auto-offset-reset=earliest", "juplo.consumer.topic=" + TOPIC }) @AutoConfigureMockMvc @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) -- 2.20.1