X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferConsumer.java;h=1cae540ff24f5a2d7d2dea4f10e182060cd871c5;hp=2ef7ee3db25c1630103a5b991b1994ac032d8491;hb=9b5a2fb9d42baeb4ddde2ac146e2f8e61a1fc550;hpb=c64f93de3e59af674885fdad08c521d82f4802d1 diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 2ef7ee3..1cae540 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -2,6 +2,7 @@ package de.juplo.kafka.payment.transfer.adapter; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase; import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase; @@ -21,12 +22,14 @@ import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.stream.Collectors; +import java.util.function.Consumer; @RequestMapping("/consumer") @@ -40,7 +43,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener private final AdminClient adminClient; private final TransferRepository repository; private final ObjectMapper mapper; - private final ConsumerUseCases productionUseCases, restoreUseCases; + private final ConsumerUseCases restoreUseCases; private boolean running = false; private boolean shutdown = false; @@ -51,6 +54,13 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener private final Map instanceIdUriMapping; private final String[] instanceIdByPartition; + + private Clock clock; + private int stateStoreInterval; + + private final Consumer> productionRecordHandler; + private final Consumer> recordHandlers[]; + private volatile boolean partitionOwnershipUnknown = true; @@ -61,6 +71,8 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener KafkaConsumer consumer, AdminClient adminClient, TransferRepository repository, + Clock clock, + int stateStoreInterval, ObjectMapper mapper, ConsumerUseCases productionUseCases, ConsumerUseCases restoreUseCases) @@ -82,15 +94,21 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener this.consumer = consumer; this.adminClient = adminClient; this.repository = repository; + this.clock = clock; + this.stateStoreInterval = stateStoreInterval; this.mapper = mapper; - this.productionUseCases = productionUseCases; this.restoreUseCases = restoreUseCases; + + productionRecordHandler = (record) -> handleRecord(record, productionUseCases); + this.recordHandlers = new Consumer[numPartitions]; } @Override public void run() { + Instant stateStored = clock.instant(); + while (running) { try @@ -100,7 +118,26 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener continue; log.debug("polled {} records", records.count()); - records.forEach(record -> handleRecord(record, productionUseCases)); + records.forEach(record -> recordHandlers[record.partition()].accept(record)); + + Instant now = clock.instant(); + if ( + stateStoreInterval > 0 && + Duration.between(stateStored, now).getSeconds() >= stateStoreInterval) + { + Map offsets = new HashMap<>(); + + for (TopicPartition topicPartition : consumer.assignment()) + { + Integer partition = topicPartition.partition(); + Long offset = consumer.position(topicPartition); + log.info("storing state locally for {}/{}: {}", topic, partition, offset); + offsets.put(partition, offset); + } + + repository.storeState(offsets); + stateStored = now; + } } catch (WakeupException e) { @@ -161,6 +198,21 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener } + /** + * Identifies the URI, at which the Group-Instance can be reached, + * that holds the state for a specific {@link Transfer}. + * + * The {@link Transfer#getId() ID} of the {@link Transfer} is named + * {@code key} here and of type {@code String}, because this example + * project stores the key as a String in Kafka to simplify the listing + * and manual manipulation of the according topic. + * + * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}. + * @return An {@link Optional}, that holds the URI at which the Group-Instance + * can be reached, that holds the state for the {@link Transfer}, that + * is identified by the key (if present), or is empty, if the {@link Transfer} + * would be handled by the local instance. + */ public Optional uriForKey(String key) { synchronized (this) @@ -196,6 +248,13 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener { partitionOwnershipUnknown = true; log.info("partitions revoked: {}", partitions); + for (TopicPartition topicPartition : partitions) + { + int partition = topicPartition.partition(); + long offset = consumer.position(topicPartition); + log.info("deactivating partition {}, offset: {}", partition, offset); + repository.deactivatePartition(partition, offset); + } } @Override @@ -204,7 +263,27 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener log.info("partitions assigned: {}", partitions); fetchAssignmentsAsync(); if (partitions.size() > 0) - restore(partitions); + { + for (Map.Entry entry : consumer.endOffsets(partitions).entrySet()) + { + TopicPartition topicPartition = entry.getKey(); + Integer partition = topicPartition.partition(); + long offset = repository.activatePartition(partition); + log.info("activated partition {}, seeking to offset {}", partition, offset); + consumer.seek(topicPartition, offset); + Long endOffset = entry.getValue(); + if (offset < endOffset) + { + log.info("--> starting restore of partition {}: {} -> {}", partition, offset, endOffset); + recordHandlers[partition] = new RestoreRecordHandler(endOffset); + } + else + { + log.info("--> partition {} is up-to-date, offset: {}", partition, offset); + recordHandlers[partition] = productionRecordHandler; + } + } + } } private void fetchAssignmentsAsync() @@ -245,61 +324,45 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener } - private void restore(Collection partitions) + class RestoreRecordHandler implements Consumer> { - log.info("--> starting restore..."); - - partitions - .stream() - .map(topicPartition -> topicPartition.partition()) - .forEach(partition -> repository.resetStorageForPartition(partition)); - - Map lastSeen = - consumer - .endOffsets(partitions) - .entrySet() - .stream() - .collect(Collectors.toMap( - entry -> entry.getKey().partition(), - entry -> entry.getValue() - 1)); - - Map positions = - lastSeen - .keySet() - .stream() - .collect(Collectors.toMap( - partition -> partition, - partition -> 0l)); - - while ( - positions - .entrySet() - .stream() - .map(entry -> entry.getValue() < lastSeen.get(entry.getKey())) - .reduce(false, (a, b) -> a || b)) + final long seen; + + + RestoreRecordHandler(Long endOffset) { - try - { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - if (records.count() == 0) - continue; + this.seen = endOffset - 1; + } - log.debug("polled {} records", records.count()); - records.forEach(record -> - { - handleRecord(record, restoreUseCases); - positions.put(record.partition(), record.offset()); - }); + + @Override + public void accept(ConsumerRecord record) + { + if (seen < record.offset()) + { + int partition = record.partition(); + log.info( + "--> restore of partition {} completed: needed={}, seen={}!", + partition, + seen, + record.offset()); + recordHandlers[partition] = productionRecordHandler; + productionRecordHandler.accept(record); } - catch(WakeupException e) + else { - log.info("--> cleanly interrupted while restoring"); + handleRecord(record, restoreUseCases); + if (seen == record.offset()) + { + int partition = record.partition(); + log.info( "--> restore of partition {} completed!", partition); + recordHandlers[partition] = productionRecordHandler; + } } } - - log.info("--> restore completed!"); } + @PostMapping("start") public synchronized String start() {