From: Kai Moritz Date: Sat, 2 Nov 2024 10:33:53 +0000 (+0100) Subject: Refactor: DRY für state-change zu UNASSIGNED X-Git-Tag: consumer/spring-consumer--log-compaction--2024-11-13--si~11 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=8e75d703a87972b913ba9410c62b21b58c9041c6;p=demos%2Fkafka%2Ftraining Refactor: DRY für state-change zu UNASSIGNED --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index b22f5cc..e687a36 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -368,15 +368,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener PartitionState partitionState = partitionStates[partition.partition()]; switch (partitionState) { - case RESTORING: - case ASSIGNED: - phaser.arriveAndDeregister(); - log.info( - "{} - Deregistered party for revoked partition {}. New total number of parties: {}", - id, - partition, - phaser.getRegisteredParties()); - partitionStates[partition.partition()] = PartitionState.UNASSIGNED; + case RESTORING, ASSIGNED: + stateUnassigned(partition.partition()); break; default: case UNASSIGNED: @@ -405,6 +398,25 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener consumer.resume(List.of(messagePartition)); } + private void stateUnassigned(int partition) + { + log.info( + "{} - State-change for partition {}: {} -> UNASSIGNED", + id, + partition, + partitionStates[partition]); + + partitionStates[partition] = PartitionState.UNASSIGNED; + + phaser.arriveAndDeregister(); + log.info( + "{} - Deregistered party for revoked partition {}. New total number of parties: {}", + id, + partition, + phaser.getRegisteredParties()); + } + + public void shutdown() throws InterruptedException { log.info("{} joining the worker-thread...", id);