From 20e81d81c1c14965a4ee949bb640154845f9e949 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Nov 2024 11:33:53 +0100 Subject: [PATCH] =?utf8?q?Refactor:=20DRY=20f=C3=BCr=20state-change=20zu?= =?utf8?q?=20UNASSIGNED?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ExampleConsumer.java | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index f1675b4..3552689 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -360,15 +360,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener PartitionState partitionState = partitionStates[partition.partition()]; switch (partitionState) { - case RESTORING: - case ASSIGNED: - phaser.arriveAndDeregister(); - log.info( - "{} - Deregistered partie for revoked partition {}. New total number of parties: {}", - id, - partition, - phaser.getRegisteredParties()); - partitionStates[partition.partition()] = PartitionState.UNASSIGNED; + case RESTORING, ASSIGNED: + stateUnassigned(partition.partition()); break; default: case UNASSIGNED: @@ -397,6 +390,25 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener consumer.resume(List.of(messagePartition)); } + private void stateUnassigned(int partition) + { + log.info( + "{} - State-change for partition {}: {} -> UNASSIGNED", + id, + partition, + partitionStates[partition]); + + partitionStates[partition] = PartitionState.UNASSIGNED; + + phaser.arriveAndDeregister(); + log.info( + "{} - Deregistered partie for revoked partition {}. New total number of parties: {}", + id, + partition, + phaser.getRegisteredParties()); + } + + public void shutdown() throws InterruptedException { log.info("{} joining the worker-thread...", id); -- 2.20.1