X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferConsumer.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferConsumer.java;h=aa00737adc0717d8cf62de2bcb0ad47f5832968c;hp=2ef7ee3db25c1630103a5b991b1994ac032d8491;hb=2432aeedb30ac4c1405045514d8eacb791a4d352;hpb=c64f93de3e59af674885fdad08c521d82f4802d1 diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 2ef7ee3..aa00737 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -21,7 +21,9 @@ import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -51,6 +53,9 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener private final Map instanceIdUriMapping; private final String[] instanceIdByPartition; + private Clock clock; + private int stateStoreInterval; + private volatile boolean partitionOwnershipUnknown = true; @@ -61,6 +66,8 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener KafkaConsumer consumer, AdminClient adminClient, TransferRepository repository, + Clock clock, + int stateStoreInterval, ObjectMapper mapper, ConsumerUseCases productionUseCases, ConsumerUseCases restoreUseCases) @@ -82,6 +89,8 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener this.consumer = consumer; this.adminClient = adminClient; this.repository = repository; + this.clock = clock; + this.stateStoreInterval = stateStoreInterval; this.mapper = mapper; this.productionUseCases = productionUseCases; this.restoreUseCases = restoreUseCases; @@ -91,6 +100,8 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener @Override public void run() { + Instant stateStored = clock.instant(); + while (running) { try @@ -101,6 +112,25 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener log.debug("polled {} records", records.count()); records.forEach(record -> handleRecord(record, productionUseCases)); + + Instant now = clock.instant(); + if ( + stateStoreInterval > 0 && + Duration.between(stateStored, now).getSeconds() >= stateStoreInterval) + { + Map offsets = new HashMap<>(); + + for (TopicPartition topicPartition : consumer.assignment()) + { + Integer partition = topicPartition.partition(); + Long offset = consumer.position(topicPartition); + log.info("storing state locally for {}/{}: {}", topic, partition, offset); + offsets.put(partition, offset); + } + + repository.storeState(offsets); + stateStored = now; + } } catch (WakeupException e) { @@ -196,6 +226,13 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener { partitionOwnershipUnknown = true; log.info("partitions revoked: {}", partitions); + for (TopicPartition topicPartition : partitions) + { + int partition = topicPartition.partition(); + long offset = consumer.position(topicPartition); + log.info("deactivating partition {}, offset: {}", partition, offset); + repository.deactivatePartition(partition, offset); + } } @Override @@ -204,7 +241,17 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener log.info("partitions assigned: {}", partitions); fetchAssignmentsAsync(); if (partitions.size() > 0) + { + for (TopicPartition topicPartition : partitions) + { + int partition = topicPartition.partition(); + long offset = repository.activatePartition(partition); + log.info("activated partition {}, seeking to offset {}", partition, offset); + consumer.seek(topicPartition, offset); + } + restore(partitions); + } } private void fetchAssignmentsAsync() @@ -249,11 +296,6 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener { log.info("--> starting restore..."); - partitions - .stream() - .map(topicPartition -> topicPartition.partition()) - .forEach(partition -> repository.resetStorageForPartition(partition)); - Map lastSeen = consumer .endOffsets(partitions) @@ -269,7 +311,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener .stream() .collect(Collectors.toMap( partition -> partition, - partition -> 0l)); + partition -> repository.storedPosition(partition))); while ( positions