From 15e6e317e4568c3e9197889e72b2e70a2804ad2f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Nov 2024 11:33:53 +0100 Subject: [PATCH] =?utf8?q?Refactor:=20DRY=20f=C3=BCr=20state-change=20zu?= =?utf8?q?=20UNASSIGNED?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ExampleConsumer.java | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 2dec41d..b08b209 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -368,15 +368,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener PartitionState partitionState = partitionStates[partition.partition()]; switch (partitionState) { - case RESTORING: - case ASSIGNED: - phaser.arriveAndDeregister(); - log.info( - "{} - Deregistered party for revoked partition {}. New total number of parties: {}", - id, - partition, - phaser.getRegisteredParties()); - partitionStates[partition.partition()] = PartitionState.UNASSIGNED; + case RESTORING, ASSIGNED: + stateUnassigned(partition.partition()); break; default: case UNASSIGNED: @@ -405,6 +398,25 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener consumer.resume(List.of(messagePartition)); } + private void stateUnassigned(int partition) + { + log.info( + "{} - State-change for partition {}: {} -> UNASSIGNED", + id, + partition, + partitionStates[partition]); + + partitionStates[partition] = PartitionState.UNASSIGNED; + + phaser.arriveAndDeregister(); + log.info( + "{} - Deregistered party for revoked partition {}. New total number of parties: {}", + id, + partition, + phaser.getRegisteredParties()); + } + + public void shutdown() throws InterruptedException { log.info("{} joining the worker-thread...", id); -- 2.20.1