X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferConsumer.java;h=aa00737adc0717d8cf62de2bcb0ad47f5832968c;hb=2432aeedb30ac4c1405045514d8eacb791a4d352;hp=501bfd0593a133bb3d3abeb7cc394f1710ac038c;hpb=43ea59755f9673864a3ef95250009f091e99a760;p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 501bfd0..aa00737 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -21,7 +21,9 @@ import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -51,6 +53,9 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener private final Map instanceIdUriMapping; private final String[] instanceIdByPartition; + private Clock clock; + private int stateStoreInterval; + private volatile boolean partitionOwnershipUnknown = true; @@ -61,6 +66,8 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener KafkaConsumer consumer, AdminClient adminClient, TransferRepository repository, + Clock clock, + int stateStoreInterval, ObjectMapper mapper, ConsumerUseCases productionUseCases, ConsumerUseCases restoreUseCases) @@ -82,6 +89,8 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener this.consumer = consumer; this.adminClient = adminClient; this.repository = repository; + this.clock = clock; + this.stateStoreInterval = stateStoreInterval; this.mapper = mapper; this.productionUseCases = productionUseCases; this.restoreUseCases = restoreUseCases; @@ -91,6 +100,8 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener @Override public void run() { + Instant stateStored = clock.instant(); + while (running) { try @@ -101,6 +112,25 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener log.debug("polled {} records", records.count()); records.forEach(record -> handleRecord(record, productionUseCases)); + + Instant now = clock.instant(); + if ( + stateStoreInterval > 0 && + Duration.between(stateStored, now).getSeconds() >= stateStoreInterval) + { + Map offsets = new HashMap<>(); + + for (TopicPartition topicPartition : consumer.assignment()) + { + Integer partition = topicPartition.partition(); + Long offset = consumer.position(topicPartition); + log.info("storing state locally for {}/{}: {}", topic, partition, offset); + offsets.put(partition, offset); + } + + repository.storeState(offsets); + stateStored = now; + } } catch (WakeupException e) { @@ -187,6 +217,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener // Hence, the app would not react to any signal (CTRL-C, for example) except // a KILL until the restoring is finished. future = CompletableFuture.runAsync(() -> start()); + log.info("start of application completed"); } @@ -195,6 +226,13 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener { partitionOwnershipUnknown = true; log.info("partitions revoked: {}", partitions); + for (TopicPartition topicPartition : partitions) + { + int partition = topicPartition.partition(); + long offset = consumer.position(topicPartition); + log.info("deactivating partition {}, offset: {}", partition, offset); + repository.deactivatePartition(partition, offset); + } } @Override @@ -203,7 +241,17 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener log.info("partitions assigned: {}", partitions); fetchAssignmentsAsync(); if (partitions.size() > 0) + { + for (TopicPartition topicPartition : partitions) + { + int partition = topicPartition.partition(); + long offset = repository.activatePartition(partition); + log.info("activated partition {}, seeking to offset {}", partition, offset); + consumer.seek(topicPartition, offset); + } + restore(partitions); + } } private void fetchAssignmentsAsync() @@ -248,11 +296,6 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener { log.info("--> starting restore..."); - partitions - .stream() - .map(topicPartition -> topicPartition.partition()) - .forEach(partition -> repository.resetStorageForPartition(partition)); - Map lastSeen = consumer .endOffsets(partitions) @@ -268,7 +311,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener .stream() .collect(Collectors.toMap( partition -> partition, - partition -> 0l)); + partition -> repository.storedPosition(partition))); while ( positions @@ -304,7 +347,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener { if (running) { - log.info("already running!"); + log.info("consumer already running!"); return "Already running!"; } @@ -325,7 +368,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener running = true; future = CompletableFuture.runAsync(this); - log.info("started"); + log.info("consumer started"); return "Started"; } @@ -334,7 +377,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener { if (!running) { - log.info("not running!"); + log.info("consumer not running!"); return "Not running"; } @@ -359,7 +402,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener consumer.unsubscribe(); } - log.info("stopped"); + log.info("consumer stopped"); return "Stopped"; }