From: Kai Moritz Date: Sun, 27 Jun 2021 13:19:42 +0000 (+0200) Subject: The restore-process no longer happens inside onPartitionsAssigned() X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=9b5a2fb9d42baeb4ddde2ac146e2f8e61a1fc550;p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer The restore-process no longer happens inside onPartitionsAssigned() --- diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 920d79a..1cae540 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -29,7 +29,7 @@ import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.stream.Collectors; +import java.util.function.Consumer; @RequestMapping("/consumer") @@ -43,7 +43,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener private final AdminClient adminClient; private final TransferRepository repository; private final ObjectMapper mapper; - private final ConsumerUseCases productionUseCases, restoreUseCases; + private final ConsumerUseCases restoreUseCases; private boolean running = false; private boolean shutdown = false; @@ -54,9 +54,13 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener private final Map instanceIdUriMapping; private final String[] instanceIdByPartition; + private Clock clock; private int stateStoreInterval; + private final Consumer> productionRecordHandler; + private final Consumer> recordHandlers[]; + private volatile boolean partitionOwnershipUnknown = true; @@ -93,8 +97,10 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener this.clock = clock; this.stateStoreInterval = stateStoreInterval; this.mapper = mapper; - this.productionUseCases = productionUseCases; this.restoreUseCases = restoreUseCases; + + productionRecordHandler = (record) -> handleRecord(record, productionUseCases); + this.recordHandlers = new Consumer[numPartitions]; } @@ -112,7 +118,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener continue; log.debug("polled {} records", records.count()); - records.forEach(record -> handleRecord(record, productionUseCases)); + records.forEach(record -> recordHandlers[record.partition()].accept(record)); Instant now = clock.instant(); if ( @@ -258,15 +264,25 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener fetchAssignmentsAsync(); if (partitions.size() > 0) { - for (TopicPartition topicPartition : partitions) + for (Map.Entry entry : consumer.endOffsets(partitions).entrySet()) { - int partition = topicPartition.partition(); + TopicPartition topicPartition = entry.getKey(); + Integer partition = topicPartition.partition(); long offset = repository.activatePartition(partition); log.info("activated partition {}, seeking to offset {}", partition, offset); consumer.seek(topicPartition, offset); + Long endOffset = entry.getValue(); + if (offset < endOffset) + { + log.info("--> starting restore of partition {}: {} -> {}", partition, offset, endOffset); + recordHandlers[partition] = new RestoreRecordHandler(endOffset); + } + else + { + log.info("--> partition {} is up-to-date, offset: {}", partition, offset); + recordHandlers[partition] = productionRecordHandler; + } } - - restore(partitions); } } @@ -308,56 +324,45 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener } - private void restore(Collection partitions) + class RestoreRecordHandler implements Consumer> { - log.info("--> starting restore..."); - - Map lastSeen = - consumer - .endOffsets(partitions) - .entrySet() - .stream() - .collect(Collectors.toMap( - entry -> entry.getKey().partition(), - entry -> entry.getValue() - 1)); - - Map positions = - lastSeen - .keySet() - .stream() - .collect(Collectors.toMap( - partition -> partition, - partition -> repository.storedPosition(partition))); - - while ( - positions - .entrySet() - .stream() - .map(entry -> entry.getValue() < lastSeen.get(entry.getKey())) - .reduce(false, (a, b) -> a || b)) + final long seen; + + + RestoreRecordHandler(Long endOffset) { - try - { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - if (records.count() == 0) - continue; + this.seen = endOffset - 1; + } - log.debug("polled {} records", records.count()); - records.forEach(record -> - { - handleRecord(record, restoreUseCases); - positions.put(record.partition(), record.offset()); - }); + + @Override + public void accept(ConsumerRecord record) + { + if (seen < record.offset()) + { + int partition = record.partition(); + log.info( + "--> restore of partition {} completed: needed={}, seen={}!", + partition, + seen, + record.offset()); + recordHandlers[partition] = productionRecordHandler; + productionRecordHandler.accept(record); } - catch(WakeupException e) + else { - log.info("--> cleanly interrupted while restoring"); + handleRecord(record, restoreUseCases); + if (seen == record.offset()) + { + int partition = record.partition(); + log.info( "--> restore of partition {} completed!", partition); + recordHandlers[partition] = productionRecordHandler; + } } } - - log.info("--> restore completed!"); } + @PostMapping("start") public synchronized String start() {