From: Kai Moritz Date: Sat, 2 Nov 2024 10:21:28 +0000 (+0100) Subject: Refactor: DRY für state-change zu ASSIGNED X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=4ce1b540bcd277f13c84a8efb67b1bf808b4668c;p=demos%2Fkafka%2Ftraining Refactor: DRY für state-change zu ASSIGNED --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index cb80710..f1675b4 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -169,19 +169,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener counterState.setCounterState(key, Long.parseLong(value)); if (offset + 1 == stateEndOffsets[partition]) { - log.info( - "{} - Restoring of state for partition {} done! New partition-state is ASSIGNED!", id, partition); - partitionStates[partition] = PartitionState.ASSIGNED; - - TopicPartition tp; - - tp = new TopicPartition(stateTopic, partition); - log.info("{} - Pausing state partition {}...", id, tp); - consumer.pause(List.of(tp)); - - tp = new TopicPartition(topic, partition); - log.info("{} - Resuming message partition {}...", id, tp); - consumer.resume(List.of(tp)); + log.info("{} - Restoring of state for partition {} done!", id, partition); + stateAssigned(partition); } else { @@ -352,17 +341,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener } else { - log.info( - "{} - State is up-to-date for message partition {}. New partition-state is ASSIGNED", - id, - messagePartition); - partitionStates[partition] = PartitionState.ASSIGNED; - - log.info("{} - Pausing state partition {}...", id, statePartition); - consumer.pause(List.of(statePartition)); - - log.info("{} - Resuming message partition {}...", id, messagePartition); - consumer.resume(List.of(messagePartition)); + log.info("{} - State is up-to-date for message partition {}", id, messagePartition); + stateAssigned(partition); } }); } @@ -398,6 +378,25 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener }); } + private void stateAssigned(int partition) + { + log.info( + "{} - State-change for partition {}: {} -> ASSIGNED", + id, + partition, + partitionStates[partition]); + + partitionStates[partition] = PartitionState.ASSIGNED; + + TopicPartition statePartition = new TopicPartition(stateTopic, partition); + log.info("{} - Pausing state partition {}...", id, statePartition); + consumer.pause(List.of(statePartition)); + + TopicPartition messagePartition = new TopicPartition(topic, partition); + log.info("{} - Resuming message partition {}...", id, messagePartition); + consumer.resume(List.of(messagePartition)); + } + public void shutdown() throws InterruptedException { log.info("{} joining the worker-thread...", id);