X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferConsumer.java;h=920d79ae480b6bafdc889839fda7002abffb9ec5;hp=501bfd0593a133bb3d3abeb7cc394f1710ac038c;hb=19aec49a7f3a46f55e696a5a930c48883c4f1cd2;hpb=43ea59755f9673864a3ef95250009f091e99a760 diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 501bfd0..920d79a 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -2,6 +2,7 @@ package de.juplo.kafka.payment.transfer.adapter; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase; import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase; @@ -21,7 +22,9 @@ import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -51,6 +54,9 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener private final Map instanceIdUriMapping; private final String[] instanceIdByPartition; + private Clock clock; + private int stateStoreInterval; + private volatile boolean partitionOwnershipUnknown = true; @@ -61,6 +67,8 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener KafkaConsumer consumer, AdminClient adminClient, TransferRepository repository, + Clock clock, + int stateStoreInterval, ObjectMapper mapper, ConsumerUseCases productionUseCases, ConsumerUseCases restoreUseCases) @@ -82,6 +90,8 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener this.consumer = consumer; this.adminClient = adminClient; this.repository = repository; + this.clock = clock; + this.stateStoreInterval = stateStoreInterval; this.mapper = mapper; this.productionUseCases = productionUseCases; this.restoreUseCases = restoreUseCases; @@ -91,6 +101,8 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener @Override public void run() { + Instant stateStored = clock.instant(); + while (running) { try @@ -101,6 +113,25 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener log.debug("polled {} records", records.count()); records.forEach(record -> handleRecord(record, productionUseCases)); + + Instant now = clock.instant(); + if ( + stateStoreInterval > 0 && + Duration.between(stateStored, now).getSeconds() >= stateStoreInterval) + { + Map offsets = new HashMap<>(); + + for (TopicPartition topicPartition : consumer.assignment()) + { + Integer partition = topicPartition.partition(); + Long offset = consumer.position(topicPartition); + log.info("storing state locally for {}/{}: {}", topic, partition, offset); + offsets.put(partition, offset); + } + + repository.storeState(offsets); + stateStored = now; + } } catch (WakeupException e) { @@ -161,6 +192,21 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener } + /** + * Identifies the URI, at which the Group-Instance can be reached, + * that holds the state for a specific {@link Transfer}. + * + * The {@link Transfer#getId() ID} of the {@link Transfer} is named + * {@code key} here and of type {@code String}, because this example + * project stores the key as a String in Kafka to simplify the listing + * and manual manipulation of the according topic. + * + * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}. + * @return An {@link Optional}, that holds the URI at which the Group-Instance + * can be reached, that holds the state for the {@link Transfer}, that + * is identified by the key (if present), or is empty, if the {@link Transfer} + * would be handled by the local instance. + */ public Optional uriForKey(String key) { synchronized (this) @@ -187,6 +233,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener // Hence, the app would not react to any signal (CTRL-C, for example) except // a KILL until the restoring is finished. future = CompletableFuture.runAsync(() -> start()); + log.info("start of application completed"); } @@ -195,6 +242,13 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener { partitionOwnershipUnknown = true; log.info("partitions revoked: {}", partitions); + for (TopicPartition topicPartition : partitions) + { + int partition = topicPartition.partition(); + long offset = consumer.position(topicPartition); + log.info("deactivating partition {}, offset: {}", partition, offset); + repository.deactivatePartition(partition, offset); + } } @Override @@ -203,7 +257,17 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener log.info("partitions assigned: {}", partitions); fetchAssignmentsAsync(); if (partitions.size() > 0) + { + for (TopicPartition topicPartition : partitions) + { + int partition = topicPartition.partition(); + long offset = repository.activatePartition(partition); + log.info("activated partition {}, seeking to offset {}", partition, offset); + consumer.seek(topicPartition, offset); + } + restore(partitions); + } } private void fetchAssignmentsAsync() @@ -248,11 +312,6 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener { log.info("--> starting restore..."); - partitions - .stream() - .map(topicPartition -> topicPartition.partition()) - .forEach(partition -> repository.resetStorageForPartition(partition)); - Map lastSeen = consumer .endOffsets(partitions) @@ -268,7 +327,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener .stream() .collect(Collectors.toMap( partition -> partition, - partition -> 0l)); + partition -> repository.storedPosition(partition))); while ( positions @@ -304,7 +363,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener { if (running) { - log.info("already running!"); + log.info("consumer already running!"); return "Already running!"; } @@ -325,7 +384,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener running = true; future = CompletableFuture.runAsync(this); - log.info("started"); + log.info("consumer started"); return "Started"; } @@ -334,7 +393,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener { if (!running) { - log.info("not running!"); + log.info("consumer not running!"); return "Not running"; } @@ -359,7 +418,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener consumer.unsubscribe(); } - log.info("stopped"); + log.info("consumer stopped"); return "Stopped"; }