From c9530eaa16e300c5ed812b72cfaa86e816c75824 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 21 Feb 2025 11:41:54 +0100 Subject: [PATCH] Log-Compaction: Der Zustand wird in einem Topic gespeichert MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Die Counter werden mit dem ``String``-Key indiziert ** Vorbereitung auf eine Poison-Pill Übung ist hier noch out-of-scope... * GET-Endpoint zum Abfragen der Schlüssel-Zählungen ergänzt ** Zugriff auf die `counterMap` in `ExampleConsumer` synchronisiert. ** `CounterStateController` kopiert die Map, um mögliche konkurierende Zugriffe während des Erzeugens der Ausgabe zu vermeiden. * Der Zustand des Zählers wird in einem compacted Topic abgelegt ** Der Consumer zählt, welche Nachrichten gesendet und welche bestätigt wurden. ** Über einen `Phaser` wird sichergestellt, dass alle Nachrichten von den zuständigen Brokern bestätigt wurden, bevor der nächste ``poll()``-Aufruf erfolgt. * Der Value-Typ in dem Topic `state` ist jetzt auch vom Typ `String` ** Dadurch wird die Kontrolle der Ergebnisse einfacher, da alle Nachrichten auch einfach mit `kafkacat` gelesen werden können. * Fix: Fehler müssen wie bestätigte Nachrichten behandelt werden ** Ohne explizite Fehlerbehandlung müssen auch die nicht bestätigten Nachrichten als `acked` gezählt werden. ** Ansonsten würde die Verarbeitung in einem ``poll()``-Durchlauf mit Fehler hängen bleiben, da niemals alles "gesehenen" Nachrichten auch als "bestätigt" gezählt würden. ** Dabei: Producer-Code an den aus `producer/spring-producer` angeglichen. * Log-Meldungen zum Fortschritt beim Versenden des Zähler-Status ergänzt * Log-Meldungen für das Senden des Zählerstands ergänzt * Fix: Der Rebalance-Listener wurde nie registriert * Fehler im Logging der aktiven Phase korrigiert und Meldungen verbessert * Fix: Nachrichten wurden ggf. doppelt verarbeitet ** Wenn man in einer Schliefe die Nachrichten pro Partition separat verarbeitet... ** ...dann sollte man in jedem Schleifendurchlauf auch nur die Nachrichten * der gerade zu verarbeitenden Partition abrufen! * Fix: `poll()` liefert nicht immer Nachrichten zu _allen_ Partitionen ** Ein Aufruf von `poll()` liefert _nicht unbedingt_ Nachrichten zu _jeder_ Partition, die der Instanz gerade zugeteilt ist. ** Daher konnte es auftreten, dass eine Phase nie beendet wurde, wenn `poll()` nur Nachrichten zu einer Untermenge der aktiven Partitionen geliefert hat. * Der Zustand wird aus dem ``state``-Topic wiederhergestellt * Refactor: Logik für Counter in Klasse `CounterState` extrahiert * Refactor: DRY für state-change zu ASSIGNED * Refactor: DRY für state-change zu UNASSIGNED * Refactor: Neue, klarere ``switch``-Syntax * DRY für state-change zu RESTORING * Refactor: Handling von pause/resume vollständig in State-Change-Methoden * Fix & Refactor: Restore-Behandlung wurde _allen_ aktiven Partitionen zuteil ** Durch das vorausgehende Refactoring wurde deutlich, dass die Behandlung, die den _neu_ hinzugefügten Partitionen zugedacht war, allen in `assignedPartitions` vermerkten Partitionen wiederfahren ist. ** Dies ist für den aktuellen Entwicklungsstand ggf. egal, da der wegen dem Co-Partitioning (noch!) benötigte `RangeAssignor` eh _zuerst alle_ Partitionen entzieht, bevor er _dann alle_ neu zuteilt. ** Da der Code aber auch mit dem neuen Consumer-Rebalance Protokoll funktionieren muss, wurde das Refactoring hier fortgeführt und so vollendet, dass nun _alle_ Aktionenen _nur noch_ von den Callbacks `onPartitionsAssigned()` und `onPartitionsRevoked()` ausgeht. * Der Zählerzustand wird separat pro Partition verwaltet ** Dadurch ist es möglich, den Zustand für entzogene Partitionen zu löschen. ** D.h., bei der Ausgabe ist immer klar ersichtlich, über welchen Zustand die angefragte Instanz gerade verfügt. * Refactor: Zustand muss `CounterState` vollständig übergeben werden * Refactor: Enum `PartitionState` in `State` umbenannt * Effekte des Log-Compaction in dem Topic `state` sichtbar gemacht * Setup mit ein bischen mehr Dampf (`README.sh` angepasst) * TX-kompatibler Weg zur Prüfung auf eine abgeschlossene Wiederherstellung ** Der bisher verwendete Vergleich der Offset-Positionen schlägt fehl, wenn die Implementierung um Transaktionen erweitert wird ** _Grund:_ Dann stimmt die Offset-Position nicht mehr überein, weil nach der letzten Zustands-Nachricht noch eine, von der Transaktion erzeugte, versteckte Nachricht folgt, die die Anwendung nie zu sehen bekommt! * Mögliche Exception wegen konkurrierendem Zugriff auf Map verhindert * Rückbau auf einfachen Consumer mit Statistiken zur Nachrichtenzählung, um an dem Beispiel die Verwendung des Rebalance-Listeners vorführen zu können. (D.h., das Speichern/Wiederherstellen des Zustands wurde entfernt.) * Umkehrung des Rückbaus, so dass der Zustand wieder in einem Topic mit Log-Compaction gespeichert und aus diesem Wiederhergestellt wird. * Die bisherige Implementierung notdürftig an die Typisierung angeapsst. --- README.sh | 10 +- docker/docker-compose.yml | 27 +- pom.xml | 2 +- .../juplo/kafka/ApplicationConfiguration.java | 29 +- .../de/juplo/kafka/ApplicationProperties.java | 27 ++ .../java/de/juplo/kafka/ExampleConsumer.java | 352 ++++++++++++++++-- src/main/resources/application.yml | 12 + .../java/de/juplo/kafka/ApplicationTests.java | 10 +- 8 files changed, 429 insertions(+), 40 deletions(-) diff --git a/README.sh b/README.sh index bdefd2bf..07f7de4c 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-log-compaction-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -10,7 +10,7 @@ then fi docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf consumer-1 consumer-2 +docker compose -f docker/docker-compose.yml rm -svf consumer-1 if [[ $(docker image ls -q $IMAGE) == "" || @@ -28,12 +28,8 @@ docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 docker compose -f docker/docker-compose.yml up -d producer docker compose -f docker/docker-compose.yml up -d consumer-1 -sleep 10 -docker compose -f docker/docker-compose.yml exec cli http -v consumer-1:8881/ -docker compose -f docker/docker-compose.yml up -d consumer-2 sleep 10 -docker compose -f docker/docker-compose.yml exec cli http -v consumer-1:8881/ -docker compose -f docker/docker-compose.yml exec cli http -v consumer-2:8881/ +docker compose -f docker/docker-compose.yml exec cli http consumer-1:8881/ docker compose -f docker/docker-compose.yml stop producer consumer-1 diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 6d6225db..d58151ab 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -95,11 +95,17 @@ services: echo -n Bereits konfiguriert: cat INITIALIZED kafka-topics --bootstrap-server kafka:9092 --describe --topic test + kafka-topics --bootstrap-server kafka:9092 --describe --topic state else kafka-topics --bootstrap-server kafka:9092 \ --delete \ --if-exists \ --topic test + kafka-topics --bootstrap-server kafka:9092 \ + --delete \ + --if-exists \ + --topic state \ + kafka-topics --bootstrap-server kafka:9092 \ --create \ --topic test \ @@ -108,7 +114,20 @@ services: --config min.insync.replicas=2 \ && echo Das Topic \'test\' wurde erfolgreich angelegt: \ && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \ - && date > INITIALIZED + && \ + kafka-topics --bootstrap-server kafka:9092 \ + --create \ + --topic state \ + --partitions 2 \ + --replication-factor 3 \ + --config min.insync.replicas=2 \ + --config cleanup.policy=compact \ + --config segment.ms=3000 \ + --config max.compaction.lag.ms=5000 \ + && echo Das Topic \'state\' wurde erfolgreich angelegt: \ + && kafka-topics --bootstrap-server kafka:9092 --describe --topic state \ + && \ + date > INITIALIZED fi stop_grace_period: 0s depends_on: @@ -145,7 +164,7 @@ services: juplo.producer.throttle-ms: 10 consumer: - image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT + image: juplo/spring-consumer:1.1-log-compaction-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: consumer @@ -154,7 +173,7 @@ services: logging.level.de.juplo: TRACE peter: - image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT + image: juplo/spring-consumer:1.1-log-compaction-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: peter @@ -163,7 +182,7 @@ services: logging.level.de.juplo: TRACE ute: - image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT + image: juplo/spring-consumer:1.1-log-compaction-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: ute diff --git a/pom.xml b/pom.xml index daea1679..4d29574e 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-rebalance-listener-SNAPSHOT + 1.1-log-compaction-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index d2b8e05c..ac5431af 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -2,8 +2,11 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.StickyAssignor; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; @@ -19,6 +22,7 @@ public class ApplicationConfiguration @Bean public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, + Producer kafkaProducer, ApplicationProperties properties, ConfigurableApplicationContext applicationContext) { @@ -27,6 +31,8 @@ public class ApplicationConfiguration properties.getClientId(), properties.getConsumerProperties().getTopic(), kafkaConsumer, + properties.getProducerProperties().getTopic(), + kafkaProducer, () -> applicationContext.close()); } @@ -46,10 +52,29 @@ public class ApplicationConfiguration props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval()); } props.put("metadata.maxage.ms", 5000); // 5 Sekunden - props.put("partition.assignment.strategy", StickyAssignor.class.getName()); + props.put("partition.assignment.strategy", RangeAssignor.class.getName()); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); return new KafkaConsumer<>(props); } + + @Bean + public KafkaProducer kafkaProducer(ApplicationProperties properties) + { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("client.id", properties.getClientId()); + props.put("acks", properties.getProducerProperties().getAcks()); + props.put("batch.size", properties.getProducerProperties().getBatchSize()); + props.put("metadata.maxage.ms", 5000); // 5 Sekunden + props.put("delivery.timeout.ms", 20000); // 20 Sekunden + props.put("request.timeout.ms", 10000); // 10 Sekunden + props.put("linger.ms", properties.getProducerProperties().getLingerMs()); + props.put("compression.type", properties.getProducerProperties().getCompressionType()); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + + return new KafkaProducer<>(props); + } } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index c8193c9f..0b431593 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -25,6 +25,8 @@ public class ApplicationProperties @NotNull private ConsumerProperties consumer; + @NotNull + private ProducerProperties producer; public ConsumerProperties getConsumerProperties() @@ -32,6 +34,11 @@ public class ApplicationProperties return consumer; } + public ProducerProperties getProducerProperties() + { + return producer; + } + @Validated @Getter @@ -49,4 +56,24 @@ public class ApplicationProperties enum OffsetReset { latest, earliest, none } } + + @Validated + @Getter + @Setter + static class ProducerProperties + { + @NotNull + @NotEmpty + private String topic; + @NotNull + @NotEmpty + private String acks; + @NotNull + private Integer batchSize; + @NotNull + private Integer lingerMs; + @NotNull + @NotEmpty + private String compressionType; + } } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 77f051e4..f015cf00 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -5,11 +5,14 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; import java.util.*; +import java.util.concurrent.Phaser; @Slf4j @@ -21,8 +24,18 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListene private final Thread workerThread; private final Runnable closeCallback; + private final String stateTopic; + private final Producer producer; + + private final Phaser phaser = new Phaser(1); private final Set assignedPartitions = new HashSet<>(); + private volatile State[] partitionStates; + private Map[] restoredState; private CounterState[] counterState; + private volatile long[] stateEndOffsets; + private volatile int[] seen; + private volatile int[] acked; + private volatile boolean[] done; private long consumed = 0; @@ -30,11 +43,15 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListene String clientId, String topic, Consumer consumer, + String stateTopic, + Producer producer, Runnable closeCallback) { this.id = clientId; this.topic = topic; this.consumer = consumer; + this.stateTopic = stateTopic; + this.producer = producer; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); workerThread.start(); @@ -51,25 +68,67 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListene log.info("{} - Fetching PartitionInfo for topic {}", id, topic); int numPartitions = consumer.partitionsFor(topic).size(); log.info("{} - Topic {} has {} partitions", id, topic, numPartitions); + partitionStates = new State[numPartitions]; + for (int i=0; i records = consumer.poll(Duration.ofSeconds(1)); - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } + int phase = phaser.getPhase(); + + assignedPartitions + .forEach(partition -> + { + seen[partition.partition()] = 0; + acked[partition.partition()] = 0; + done[partition.partition()] = false; + }); + + log.info("{} - Received {} messages in phase {}", id, records.count(), phase); + records + .partitions() + .forEach(partition -> + { + for (ConsumerRecord record : records.records(partition)) + { + handleRecord( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); + } + + checkRestoreProgress(partition); + + done[partition.partition()] = true; + }); + + assignedPartitions + .forEach(partition -> + { + if (seen[partition.partition()] == 0) + { + int arrivedPhase = phaser.arrive(); + log.debug("{} - Received no records for partition {} in phase {}", id, partition, arrivedPhase); + } + }); + + int arrivedPhase = phaser.arriveAndAwaitAdvance(); + log.info("{} - Phase {} is done! Next phase: {}", id, phase, arrivedPhase); } } catch(WakeupException e) @@ -101,8 +160,59 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListene consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); + if (topic.equals(this.topic)) + { + handleMessage(partition, key); + } + else + { + handleState(partition, key, value); + } + } + + private void checkRestoreProgress(TopicPartition topicPartition) + { + int partition = topicPartition.partition(); + + if (partitionStates[partition] == State.RESTORING) + { + long consumerPosition = consumer.position(topicPartition); + + if (consumerPosition + 1 >= stateEndOffsets[partition]) + { + log.info( + "{} - Position of consumer is {}. Restoring of state for partition {} done!", + id, + consumerPosition, + topicPartition); + stateAssigned(partition); + } + else + { + log.debug( + "{} - Restored state up to offset {}, end-offset: {}", + id, + consumerPosition, + stateEndOffsets[partition]); + } + } + } + + private synchronized void handleState( + int partition, + K key, + V value) + { + restoredState[partition].put(key, Long.parseLong(value.toString())); // << An Typisierung anpassen! + } + + private void handleMessage( + Integer partition, + K key) + { Long counter = computeCount(partition, key); log.info("{} - current value for counter {}: {}", id, key, counter); + sendCounterState(partition, key, counter); } private synchronized Long computeCount(int partition, K key) @@ -117,6 +227,82 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListene return result; } + void sendCounterState(int partition, K key, Long counter) + { + seen[partition]++; + + final long time = System.currentTimeMillis(); + + final ProducerRecord record = new ProducerRecord<>( + stateTopic, // Topic + key, // Key + counter.toString() // Value << TODO: An Typisierung anpassen + ); + + producer.send(record, (metadata, e) -> + { + long now = System.currentTimeMillis(); + if (e == null) + { + // HANDLE SUCCESS + log.debug( + "{} - Sent message {}={}, partition={}:{}, timestamp={}, latency={}ms", + id, + record.key(), + record.value(), + metadata.partition(), + metadata.offset(), + metadata.timestamp(), + now - time + ); + } + else + { + // HANDLE ERROR + log.error( + "{} - ERROR for message {}={}, timestamp={}, latency={}ms: {}", + id, + record.key(), + record.value(), + metadata == null ? -1 : metadata.timestamp(), + now - time, + e.toString() + ); + } + + acked[partition]++; + if (done[partition] && !(acked[partition] < seen[partition])) + { + int arrivedPhase = phaser.arrive(); + log.debug( + "{} - Arrived at phase {} for partition {}, seen={}, acked={}", + id, + arrivedPhase, + partition, + seen[partition], + acked[partition]); + } + else + { + log.debug( + "{} - Still in phase {} for partition {}, seen={}, acked={}", + id, + phaser.getPhase(), + partition, + seen[partition], + acked[partition]); + } + }); + + long now = System.currentTimeMillis(); + log.trace( + "{} - Queued message {}={}, latency={}ms", + id, + record.key(), + record.value(), + now - time + ); + } @Override public void onPartitionsAssigned(Collection partitions) @@ -124,11 +310,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListene partitions .stream() .filter(partition -> partition.topic().equals(topic)) - .forEach(partition -> - { - assignedPartitions.add(partition); - counterState[partition.partition()] = new CounterState<>(new HashMap<>()); - }); + .forEach(partition -> restoreAndAssign(partition.partition())); } @Override @@ -137,11 +319,132 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListene partitions .stream() .filter(partition -> partition.topic().equals(topic)) - .forEach(partition -> - { - assignedPartitions.remove(partition); - counterState[partition.partition()] = null; - }); + .forEach(partition -> revoke(partition.partition())); + } + + private void restoreAndAssign(int partition) + { + TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); + + long stateEndOffset = consumer + .endOffsets(List.of(statePartition)) + .get(statePartition) + .longValue(); + + long stateBeginningOffset = consumer + .beginningOffsets(List.of(statePartition)) + .get(statePartition); + + log.info( + "{} - Found beginning-offset {} and end-offset {} for state partition {}", + id, + stateBeginningOffset, + stateEndOffset, + partition); + + if (stateBeginningOffset < stateEndOffset) + { + stateRestoring(partition, stateBeginningOffset, stateEndOffset); + } + else + { + log.info("{} - No state available for partition {}", id, partition); + restoredState[partition] = new HashMap<>(); + stateAssigned(partition); + } + } + + private void revoke(int partition) + { + State partitionState = partitionStates[partition]; + switch (partitionState) + { + case RESTORING, ASSIGNED -> stateUnassigned(partition); + case UNASSIGNED -> log.warn("{} - partition {} in state {} was revoked!", id, partition, partitionState); + } + } + + private void stateRestoring(int partition, long stateBeginningOffset, long stateEndOffset) + { + log.info( + "{} - Changing partition-state for {}: {} -> RESTORING", + id, + partition, + partitionStates[partition]); + partitionStates[partition] = State.RESTORING; + + TopicPartition messagePartition = new TopicPartition(this.topic, partition); + log.info("{} - Pausing message partition {}", id, messagePartition); + consumer.pause(List.of(messagePartition)); + + TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); + log.info( + "{} - Seeking to first offset {} for state partition {}", + id, + stateBeginningOffset, + statePartition); + consumer.seek(statePartition, stateBeginningOffset); + stateEndOffsets[partition] = stateEndOffset; + restoredState[partition] = new HashMap<>(); + log.info("{} - Resuming state partition {}", id, statePartition); + consumer.resume(List.of(statePartition)); + } + + private void stateAssigned(int partition) + { + log.info( + "{} - State-change for partition {}: {} -> ASSIGNED", + id, + partition, + partitionStates[partition]); + + partitionStates[partition] = State.ASSIGNED; + + TopicPartition statePartition = new TopicPartition(stateTopic, partition); + log.info("{} - Pausing state partition {}...", id, statePartition); + consumer.pause(List.of(statePartition)); + counterState[partition] = new CounterState(restoredState[partition]); + restoredState[partition] = null; + + TopicPartition messagePartition = new TopicPartition(topic, partition); + log.info("{} - Adding partition {} to the assigned partitions", id, messagePartition); + assignedPartitions.add(messagePartition); + phaser.register(); + log.info( + "{} - Registered new party for newly assigned partition {}. New total number of parties: {}", + id, + messagePartition, + phaser.getRegisteredParties()); + log.info("{} - Resuming message partition {}...", id, messagePartition); + consumer.resume(List.of(messagePartition)); + } + + private void stateUnassigned(int partition) + { + State oldPartitionState = partitionStates[partition]; + + log.info( + "{} - State-change for partition {}: {} -> UNASSIGNED", + id, + partition, + oldPartitionState); + + partitionStates[partition] = State.UNASSIGNED; + + if (oldPartitionState == State.ASSIGNED) + { + TopicPartition messagePartition = new TopicPartition(topic, partition); + log.info("{} - Revoking partition {}", id, messagePartition); + assignedPartitions.remove(messagePartition); + counterState[partition] = null; + + phaser.arriveAndDeregister(); + log.info( + "{} - Deregistered party for revoked partition {}. New total number of parties: {}", + id, + messagePartition, + phaser.getRegisteredParties()); + } } @@ -152,4 +455,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListene log.info("{} - Joining the worker thread", id); workerThread.join(); } + + enum State + { + UNASSIGNED, + RESTORING, + ASSIGNED + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7a06731c..d9e7066e 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -6,6 +6,12 @@ juplo: topic: test auto-offset-reset: earliest auto-commit-interval: 5s + producer: + topic: state + acks: -1 + batch-size: 16384 + linger-ms: 0 + compression-type: gzip management: endpoint: shutdown: @@ -28,6 +34,12 @@ info: topic: ${juplo.consumer.topic} auto-offset-reset: ${juplo.consumer.auto-offset-reset} auto-commit-interval: ${juplo.consumer.auto-commit-interval} + producer: + topic: ${juplo.producer.topic} + acks: ${juplo.producer.acks} + batch-size: ${juplo.producer.batch-size} + linger-ms: ${juplo.producer.linger-ms} + compression-type: ${juplo.producer.compression-type} logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index b427efd1..e8a9a071 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -9,8 +9,7 @@ import org.springframework.test.web.servlet.MockMvc; import java.time.Duration; -import static de.juplo.kafka.ApplicationTests.PARTITIONS; -import static de.juplo.kafka.ApplicationTests.TOPIC; +import static de.juplo.kafka.ApplicationTests.*; import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; @@ -21,12 +20,13 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. properties = { "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", "spring.kafka.consumer.auto-offset-reset=earliest", - "juplo.consumer.topic=" + TOPIC }) + "juplo.consumer.topic=" + TOPIC_IN}) @AutoConfigureMockMvc -@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) +@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS) public class ApplicationTests { - static final String TOPIC = "FOO"; + static final String TOPIC_IN = "FOO"; + static final String TOPIC_OUT = "BAR"; static final int PARTITIONS = 10; @Autowired -- 2.20.1