From: Kai Moritz Date: Sat, 2 Nov 2024 10:21:28 +0000 (+0100) Subject: Refactor: DRY für state-change zu ASSIGNED X-Git-Tag: consumer/spring-consumer--log-compaction--2024-11-13--si~12 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=706e49d6e8ed074f69828d70a89f422ebcbd5a75;p=demos%2Fkafka%2Ftraining Refactor: DRY für state-change zu ASSIGNED --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 0b57140..b22f5cc 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -177,19 +177,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener counterState.setCounterState(key, Long.parseLong(value)); if (offset + 1 == stateEndOffsets[partition]) { - log.info( - "{} - Restoring of state for partition {} done! New partition-state is ASSIGNED!", id, partition); - partitionStates[partition] = PartitionState.ASSIGNED; - - TopicPartition tp; - - tp = new TopicPartition(stateTopic, partition); - log.info("{} - Pausing state partition {}...", id, tp); - consumer.pause(List.of(tp)); - - tp = new TopicPartition(topic, partition); - log.info("{} - Resuming message partition {}...", id, tp); - consumer.resume(List.of(tp)); + log.info("{} - Restoring of state for partition {} done!", id, partition); + stateAssigned(partition); } else { @@ -360,17 +349,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener } else { - log.info( - "{} - State is up-to-date for message partition {}. New partition-state is ASSIGNED", - id, - messagePartition); - partitionStates[partition] = PartitionState.ASSIGNED; - - log.info("{} - Pausing state partition {}...", id, statePartition); - consumer.pause(List.of(statePartition)); - - log.info("{} - Resuming message partition {}...", id, messagePartition); - consumer.resume(List.of(messagePartition)); + log.info("{} - State is up-to-date for message partition {}", id, messagePartition); + stateAssigned(partition); } }); } @@ -406,6 +386,25 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener }); } + private void stateAssigned(int partition) + { + log.info( + "{} - State-change for partition {}: {} -> ASSIGNED", + id, + partition, + partitionStates[partition]); + + partitionStates[partition] = PartitionState.ASSIGNED; + + TopicPartition statePartition = new TopicPartition(stateTopic, partition); + log.info("{} - Pausing state partition {}...", id, statePartition); + consumer.pause(List.of(statePartition)); + + TopicPartition messagePartition = new TopicPartition(topic, partition); + log.info("{} - Resuming message partition {}...", id, messagePartition); + consumer.resume(List.of(messagePartition)); + } + public void shutdown() throws InterruptedException { log.info("{} joining the worker-thread...", id);