From: Kai Moritz Date: Mon, 28 Oct 2024 09:28:50 +0000 (+0100) Subject: Der Zustand wird aus dem ``state``-Topic wiederhergestellt X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=3ea255f94414ceb23b56fd4f5d9a426f766e252c;p=demos%2Fkafka%2Ftraining Der Zustand wird aus dem ``state``-Topic wiederhergestellt --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index c3c4e79..1cdf0a3 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -2,7 +2,7 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.StickyAssignor; +import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -49,7 +49,7 @@ public class ApplicationConfiguration props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval()); } props.put("metadata.maxage.ms", 5000); // 5 Sekunden - props.put("partition.assignment.strategy", StickyAssignor.class.getName()); + props.put("partition.assignment.strategy", RangeAssignor.class.getName()); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 9485110..b90cb77 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -30,6 +30,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private volatile boolean running = false; private final Phaser phaser = new Phaser(1); private final Set assignedPartitions = new HashSet<>(); + private volatile PartitionState[] partitionStates; + private volatile long[] stateEndOffsets; private volatile int[] seen; private volatile int[] acked; private volatile boolean[] done; @@ -62,12 +64,18 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener log.info("{} - Fetching PartitionInfo for topic {}", id, topic); int numPartitions = consumer.partitionsFor(topic).size(); log.info("{} - Topic {} has {} partitions", id, topic, numPartitions); + partitionStates = new PartitionState[numPartitions]; + for (int i=0; i { seen[partition.partition()] = 0; acked[partition.partition()] = 0; done[partition.partition()] = false; + }); + log.info("{} - Received {} messages in phase {}", id, records.count(), phase); + records + .partitions() + .forEach(partition -> + { for (ConsumerRecord record : records.records(partition)) { handleRecord( @@ -96,6 +109,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener } done[partition.partition()] = true; + }); + + assignedPartitions + .forEach(partition -> + { if (seen[partition.partition()] == 0) { int arrivedPhase = phaser.arrive(); @@ -131,6 +149,56 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener { consumed++; log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); + + if (topic.equals(this.topic)) + { + handleMessage(partition, key); + } + else + { + handleState(partition, offset, key, value); + } + } + + private synchronized void handleState( + int partition, + long offset, + String key, + String value) + { + counterState.put(key, Long.parseLong(value)); + if (offset + 1 == stateEndOffsets[partition]) + { + log.info( + "{} - Restoring of state for partition {} done! New partition-state is ASSIGNED!", id, partition); + partitionStates[partition] = PartitionState.ASSIGNED; + + TopicPartition tp; + + tp = new TopicPartition(stateTopic, partition); + log.info("{} - Pausing state partition {}...", id, tp); + consumer.pause(List.of(tp)); + + tp = new TopicPartition(topic, partition); + log.info("{} - Resuming message partition {}...", id, tp); + consumer.resume(List.of(tp)); + } + else + { + log.debug( + "{} - Restored state up to offset {}, end-offset: {}, state: {}={}", + id, + offset, + stateEndOffsets[partition], + key, + value); + } + } + + private void handleMessage( + Integer partition, + String key) + { Long counter = computeCount(key); log.info("{} - current value for counter {}: {}", id, key, counter); sendCounterState(partition, key, counter); @@ -226,25 +294,108 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener @Override public void onPartitionsAssigned(Collection partitions) { - assignedPartitions.addAll(partitions); - phaser.bulkRegister(partitions.size()); - log.info( - "{} - Added {} parties for newly assigned partitions. New total number of parties: {}", - id, - partitions.size(), - phaser.getRegisteredParties()); + partitions.forEach(partition -> + { + if (partition.topic().equals(topic)) + { + log.info("{} - Adding partition {}", id, partition); + assignedPartitions.add(partition); + + phaser.register(); + log.info( + "{} - Registered new partie for restored assigned partition {}. New total number of parties: {}", + id, + partition, + phaser.getRegisteredParties()); + + log.info( + "{} - Changing partition-state for {}: {} -> RESTORING", + id, + partition, + partitionStates[partition.partition()]); + partitionStates[partition.partition()] = PartitionState.RESTORING; + } + else + { + long endOffset = consumer + .endOffsets(List.of(partition)) + .get(partition) + .longValue(); + log.info("{} - Found end-offset {} for state partition {}", id, endOffset, partition); + stateEndOffsets[partition.partition()] = endOffset; + } + }); + + assignedPartitions.forEach(messagePartition -> + { + int partition = messagePartition.partition(); + TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); + + long stateBeginningOffset = consumer + .beginningOffsets(List.of(statePartition)) + .get(statePartition); + + if (stateBeginningOffset < stateEndOffsets[partition]) + { + log.info( + "{} - Seeking to first offset {} for state partition {}", + id, + stateBeginningOffset, + statePartition); + consumer.seek(statePartition, stateBeginningOffset); + + log.info("{} - Pausing message partition {}", id, messagePartition); + consumer.pause(List.of(messagePartition)); + + log.info("{} - Resuming state partition {}", id, statePartition); + consumer.resume(List.of(statePartition)); + } + else + { + log.info( + "{} - State is up-to-date for message partition {}. New partition-state is ASSIGNED", + id, + messagePartition); + partitionStates[partition] = PartitionState.ASSIGNED; + + log.info("{} - Pausing state partition {}...", id, statePartition); + consumer.pause(List.of(statePartition)); + + log.info("{} - Resuming message partition {}...", id, messagePartition); + consumer.resume(List.of(messagePartition)); + } + }); } @Override - public void onPartitionsRevoked(Collection partitions) + public synchronized void onPartitionsRevoked(Collection partitions) { - assignedPartitions.removeAll(partitions); - partitions.forEach(partition -> phaser.arriveAndDeregister()); - log.info( - "{} - Removed {} parties for revoked partitions. New total number of parties: {}", - id, - partitions.size(), - phaser.getRegisteredParties()); + partitions.forEach(partition -> + { + if (partition.topic().equals(topic)) + { + log.info("{} - Revoking partition {}", id, partition); + assignedPartitions.remove(partition); + + PartitionState partitionState = partitionStates[partition.partition()]; + switch (partitionState) + { + case RESTORING: + case ASSIGNED: + phaser.arriveAndDeregister(); + log.info( + "{} - Deregistered partie for revoked partition {}. New total number of parties: {}", + id, + partition, + phaser.getRegisteredParties()); + partitionStates[partition.partition()] = PartitionState.UNASSIGNED; + break; + default: + case UNASSIGNED: + log.warn("{} - partition {} in state {} was revoked!", id, partition, partitionState); + } + } + }); } public void shutdown() throws InterruptedException @@ -254,4 +405,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener consumer.wakeup(); workerThread.join(); } + + enum PartitionState + { + UNASSIGNED, + RESTORING, + ASSIGNED + } }