From f6f212e317f8197d9245c6c19cfc4b9984442d6a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Nov 2024 17:11:14 +0100 Subject: [PATCH] Refactor: Enum `PartitionState` in `State` umbenannt --- .../java/de/juplo/kafka/ExampleConsumer.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 39b120b..0e66528 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -29,7 +29,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private volatile boolean running = false; private final Phaser phaser = new Phaser(1); private final Set assignedPartitions = new HashSet<>(); - private volatile PartitionState[] partitionStates; + private volatile State[] partitionStates; private Map[] restoredState; private CounterState[] counterState; private volatile long[] stateEndOffsets; @@ -65,10 +65,10 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener log.info("{} - Fetching PartitionInfo for topic {}", id, topic); int numPartitions = consumer.partitionsFor(topic).size(); log.info("{} - Topic {} has {} partitions", id, topic, numPartitions); - partitionStates = new PartitionState[numPartitions]; + partitionStates = new State[numPartitions]; for (int i=0; i stateUnassigned(partition); @@ -352,7 +352,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener id, partition, partitionStates[partition]); - partitionStates[partition] = PartitionState.RESTORING; + partitionStates[partition] = State.RESTORING; TopicPartition messagePartition = new TopicPartition(this.topic, partition); log.info("{} - Pausing message partition {}", id, messagePartition); @@ -379,7 +379,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener partition, partitionStates[partition]); - partitionStates[partition] = PartitionState.ASSIGNED; + partitionStates[partition] = State.ASSIGNED; TopicPartition statePartition = new TopicPartition(stateTopic, partition); log.info("{} - Pausing state partition {}...", id, statePartition); @@ -402,7 +402,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private void stateUnassigned(int partition) { - PartitionState oldPartitionState = partitionStates[partition]; + State oldPartitionState = partitionStates[partition]; log.info( "{} - State-change for partition {}: {} -> UNASSIGNED", @@ -410,9 +410,9 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener partition, oldPartitionState); - partitionStates[partition] = PartitionState.UNASSIGNED; + partitionStates[partition] = State.UNASSIGNED; - if (oldPartitionState == PartitionState.ASSIGNED) + if (oldPartitionState == State.ASSIGNED) { TopicPartition messagePartition = new TopicPartition(topic, partition); log.info("{} - Revoking partition {}", id, messagePartition); @@ -437,7 +437,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener workerThread.join(); } - enum PartitionState + enum State { UNASSIGNED, RESTORING, -- 2.20.1