From: Kai Moritz <kai@juplo.de> Date: Sat, 2 Nov 2024 16:11:14 +0000 (+0100) Subject: Refactor: Enum `PartitionState` in `State` umbenannt X-Git-Tag: consumer/spring-consumer--log-compaction--2024-11-signal~4 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=7a53df5daeb3c7defa0bb93c95906c60e06a2ff4;p=demos%2Fkafka%2Ftraining Refactor: Enum `PartitionState` in `State` umbenannt --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 490fbdc7..a0a91b5d 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -30,7 +30,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private volatile boolean running = false; private final Phaser phaser = new Phaser(1); private final Set<TopicPartition> assignedPartitions = new HashSet<>(); - private volatile PartitionState[] partitionStates; + private volatile State[] partitionStates; private Map<String, Long>[] restoredState; private CounterState[] counterState; private volatile long[] stateEndOffsets; @@ -69,10 +69,10 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener log.info("{} - Fetching PartitionInfo for topic {}", id, topic); int numPartitions = consumer.partitionsFor(topic).size(); log.info("{} - Topic {} has {} partitions", id, topic, numPartitions); - partitionStates = new PartitionState[numPartitions]; + partitionStates = new State[numPartitions]; for (int i=0; i<numPartitions; i++) { - partitionStates[i] = PartitionState.UNASSIGNED; + partitionStates[i] = State.UNASSIGNED; } restoredState = new Map[numPartitions]; counterState = new CounterState[numPartitions]; @@ -345,7 +345,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private void revoke(int partition) { - PartitionState partitionState = partitionStates[partition]; + State partitionState = partitionStates[partition]; switch (partitionState) { case RESTORING, ASSIGNED -> stateUnassigned(partition); @@ -360,7 +360,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener id, partition, partitionStates[partition]); - partitionStates[partition] = PartitionState.RESTORING; + partitionStates[partition] = State.RESTORING; TopicPartition messagePartition = new TopicPartition(this.topic, partition); log.info("{} - Pausing message partition {}", id, messagePartition); @@ -387,7 +387,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener partition, partitionStates[partition]); - partitionStates[partition] = PartitionState.ASSIGNED; + partitionStates[partition] = State.ASSIGNED; TopicPartition statePartition = new TopicPartition(stateTopic, partition); log.info("{} - Pausing state partition {}...", id, statePartition); @@ -410,7 +410,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private void stateUnassigned(int partition) { - PartitionState oldPartitionState = partitionStates[partition]; + State oldPartitionState = partitionStates[partition]; log.info( "{} - State-change for partition {}: {} -> UNASSIGNED", @@ -418,9 +418,9 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener partition, oldPartitionState); - partitionStates[partition] = PartitionState.UNASSIGNED; + partitionStates[partition] = State.UNASSIGNED; - if (oldPartitionState == PartitionState.ASSIGNED) + if (oldPartitionState == State.ASSIGNED) { TopicPartition messagePartition = new TopicPartition(topic, partition); log.info("{} - Revoking partition {}", id, messagePartition); @@ -445,7 +445,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener workerThread.join(); } - enum PartitionState + enum State { UNASSIGNED, RESTORING,