From: Kai Moritz Date: Sat, 2 Nov 2024 16:11:14 +0000 (+0100) Subject: Refactor: Enum `PartitionState` in `State` umbenannt X-Git-Tag: consumer/spring-consumer--log-compaction--2024-11-13--si~4 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=7a53df5daeb3c7defa0bb93c95906c60e06a2ff4;p=demos%2Fkafka%2Ftraining Refactor: Enum `PartitionState` in `State` umbenannt --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 490fbdc..a0a91b5 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -30,7 +30,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private volatile boolean running = false; private final Phaser phaser = new Phaser(1); private final Set assignedPartitions = new HashSet<>(); - private volatile PartitionState[] partitionStates; + private volatile State[] partitionStates; private Map[] restoredState; private CounterState[] counterState; private volatile long[] stateEndOffsets; @@ -69,10 +69,10 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener log.info("{} - Fetching PartitionInfo for topic {}", id, topic); int numPartitions = consumer.partitionsFor(topic).size(); log.info("{} - Topic {} has {} partitions", id, topic, numPartitions); - partitionStates = new PartitionState[numPartitions]; + partitionStates = new State[numPartitions]; for (int i=0; i stateUnassigned(partition); @@ -360,7 +360,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener id, partition, partitionStates[partition]); - partitionStates[partition] = PartitionState.RESTORING; + partitionStates[partition] = State.RESTORING; TopicPartition messagePartition = new TopicPartition(this.topic, partition); log.info("{} - Pausing message partition {}", id, messagePartition); @@ -387,7 +387,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener partition, partitionStates[partition]); - partitionStates[partition] = PartitionState.ASSIGNED; + partitionStates[partition] = State.ASSIGNED; TopicPartition statePartition = new TopicPartition(stateTopic, partition); log.info("{} - Pausing state partition {}...", id, statePartition); @@ -410,7 +410,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private void stateUnassigned(int partition) { - PartitionState oldPartitionState = partitionStates[partition]; + State oldPartitionState = partitionStates[partition]; log.info( "{} - State-change for partition {}: {} -> UNASSIGNED", @@ -418,9 +418,9 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener partition, oldPartitionState); - partitionStates[partition] = PartitionState.UNASSIGNED; + partitionStates[partition] = State.UNASSIGNED; - if (oldPartitionState == PartitionState.ASSIGNED) + if (oldPartitionState == State.ASSIGNED) { TopicPartition messagePartition = new TopicPartition(topic, partition); log.info("{} - Revoking partition {}", id, messagePartition); @@ -445,7 +445,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener workerThread.join(); } - enum PartitionState + enum State { UNASSIGNED, RESTORING,