X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferConsumer.java;h=920d79ae480b6bafdc889839fda7002abffb9ec5;hp=63fbef5f523bb8a7d9cc1f838e4d51726ace008d;hb=19aec49a7f3a46f55e696a5a930c48883c4f1cd2;hpb=951dcf0533551cccf48062c12e61192035a27a9a diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 63fbef5..920d79a 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -2,11 +2,15 @@ package de.juplo.kafka.payment.transfer.adapter; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase; import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase; -import lombok.RequiredArgsConstructor; +import de.juplo.kafka.payment.transfer.ports.TransferRepository; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -18,36 +22,87 @@ import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; +import java.time.Clock; import java.time.Duration; -import java.util.List; -import java.util.Map; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.stream.Collectors; @RequestMapping("/consumer") @ResponseBody -@RequiredArgsConstructor @Slf4j -public class TransferConsumer implements Runnable +public class TransferConsumer implements Runnable, ConsumerRebalanceListener { private final String topic; + private final int numPartitions; private final KafkaConsumer consumer; - private final ExecutorService executorService; + private final AdminClient adminClient; + private final TransferRepository repository; private final ObjectMapper mapper; private final ConsumerUseCases productionUseCases, restoreUseCases; - private boolean restoring = true; private boolean running = false; private boolean shutdown = false; private Future future = null; + private final String groupId; + private final String groupInstanceId; + private final Map instanceIdUriMapping; + private final String[] instanceIdByPartition; + + private Clock clock; + private int stateStoreInterval; + + private volatile boolean partitionOwnershipUnknown = true; + + + public TransferConsumer( + String topic, + int numPartitions, + Map instanceIdUriMapping, + KafkaConsumer consumer, + AdminClient adminClient, + TransferRepository repository, + Clock clock, + int stateStoreInterval, + ObjectMapper mapper, + ConsumerUseCases productionUseCases, + ConsumerUseCases restoreUseCases) + { + this.topic = topic; + this.numPartitions = numPartitions; + this.groupId = consumer.groupMetadata().groupId(); + this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get(); + this.instanceIdByPartition = new String[numPartitions]; + this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size()); + for (String instanceId : instanceIdUriMapping.keySet()) + { + // Requests are not redirected for the instance itself + String uri = instanceId.equals(groupInstanceId) + ? null + : instanceIdUriMapping.get(instanceId); + this.instanceIdUriMapping.put(instanceId, uri); + } + this.consumer = consumer; + this.adminClient = adminClient; + this.repository = repository; + this.clock = clock; + this.stateStoreInterval = stateStoreInterval; + this.mapper = mapper; + this.productionUseCases = productionUseCases; + this.restoreUseCases = restoreUseCases; + } + @Override public void run() { + Instant stateStored = clock.instant(); + while (running) { try @@ -58,6 +113,25 @@ public class TransferConsumer implements Runnable log.debug("polled {} records", records.count()); records.forEach(record -> handleRecord(record, productionUseCases)); + + Instant now = clock.instant(); + if ( + stateStoreInterval > 0 && + Duration.between(stateStored, now).getSeconds() >= stateStoreInterval) + { + Map offsets = new HashMap<>(); + + for (TopicPartition topicPartition : consumer.assignment()) + { + Integer partition = topicPartition.partition(); + Long offset = consumer.position(topicPartition); + log.info("storing state locally for {}/{}: {}", topic, partition, offset); + offsets.put(partition, offset); + } + + repository.storeState(offsets); + stateStored = now; + } } catch (WakeupException e) { @@ -105,29 +179,138 @@ public class TransferConsumer implements Runnable record.partition(), record.value()); } + catch (IllegalArgumentException e) + { + log.error( + "ignoring invalid message #{} on {}/{}: {}, message={}", + record.offset(), + record.topic(), + record.partition(), + e.getMessage(), + record.value()); + } + } + + + /** + * Identifies the URI, at which the Group-Instance can be reached, + * that holds the state for a specific {@link Transfer}. + * + * The {@link Transfer#getId() ID} of the {@link Transfer} is named + * {@code key} here and of type {@code String}, because this example + * project stores the key as a String in Kafka to simplify the listing + * and manual manipulation of the according topic. + * + * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}. + * @return An {@link Optional}, that holds the URI at which the Group-Instance + * can be reached, that holds the state for the {@link Transfer}, that + * is identified by the key (if present), or is empty, if the {@link Transfer} + * would be handled by the local instance. + */ + public Optional uriForKey(String key) + { + synchronized (this) + { + while (partitionOwnershipUnknown) + { + try { wait(); } catch (InterruptedException e) {} + } + + int partition = TransferPartitioner.computeHashForKey(key, numPartitions); + return + Optional + .ofNullable(instanceIdByPartition[partition]) + .map(id -> instanceIdUriMapping.get(id)); + } } @EventListener public synchronized void onApplicationEvent(ContextRefreshedEvent event) { - // Needed, because this method is called synchronously during the - // initialization pahse of Spring. If the restoring is processed + // "Needed", because this method is called synchronously during the + // initialization pahse of Spring. If the subscription happens // in the same thread, it would block the completion of the initialization. // Hence, the app would not react to any signal (CTRL-C, for example) except // a KILL until the restoring is finished. - future = executorService.submit(() -> restore()); + future = CompletableFuture.runAsync(() -> start()); + log.info("start of application completed"); } - private void restore() + + @Override + public void onPartitionsRevoked(Collection partitions) { - log.info("--> starting restore..."); + partitionOwnershipUnknown = true; + log.info("partitions revoked: {}", partitions); + for (TopicPartition topicPartition : partitions) + { + int partition = topicPartition.partition(); + long offset = consumer.position(topicPartition); + log.info("deactivating partition {}, offset: {}", partition, offset); + repository.deactivatePartition(partition, offset); + } + } - List partitions = - consumer - .partitionsFor(topic) - .stream() - .map(info -> new TopicPartition(topic, info.partition())) - .collect(Collectors.toList()); + @Override + public void onPartitionsAssigned(Collection partitions) + { + log.info("partitions assigned: {}", partitions); + fetchAssignmentsAsync(); + if (partitions.size() > 0) + { + for (TopicPartition topicPartition : partitions) + { + int partition = topicPartition.partition(); + long offset = repository.activatePartition(partition); + log.info("activated partition {}, seeking to offset {}", partition, offset); + consumer.seek(topicPartition, offset); + } + + restore(partitions); + } + } + + private void fetchAssignmentsAsync() + { + adminClient + .describeConsumerGroups(List.of(groupId)) + .describedGroups() + .get(groupId) + .whenComplete((descriptions, e) -> + { + if (e != null) + { + log.error("could not fetch group data: {}", e.getMessage()); + } + else + { + synchronized (this) + { + for (MemberDescription description : descriptions.members()) + { + description + .assignment() + .topicPartitions() + .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get()); + } + partitionOwnershipUnknown = false; + notifyAll(); + } + } + }); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + partitionOwnershipUnknown = true; + log.info("partiotions lost: {}", partitions); + } + + + private void restore(Collection partitions) + { + log.info("--> starting restore..."); Map lastSeen = consumer @@ -144,13 +327,9 @@ public class TransferConsumer implements Runnable .stream() .collect(Collectors.toMap( partition -> partition, - partition -> 0l)); - - log.info("assigning {}}", partitions); - consumer.assign(partitions); + partition -> repository.storedPosition(partition))); while ( - restoring && positions .entrySet() .stream() @@ -173,49 +352,48 @@ public class TransferConsumer implements Runnable catch(WakeupException e) { log.info("--> cleanly interrupted while restoring"); - return; } } log.info("--> restore completed!"); - restoring = false; - - // We are intentionally _not_ unsubscribing here, since that would - // reset the offset to _earliest_, because we disabled offset-commits. - - start(); } @PostMapping("start") public synchronized String start() { - if (restoring) + if (running) { - log.error("cannot start while restoring"); - return "Denied: Restoring!"; + log.info("consumer already running!"); + return "Already running!"; } - String result = "Started"; - - if (running) + int foundNumPartitions = consumer.partitionsFor(topic).size(); + if (foundNumPartitions != numPartitions) { - stop(); - result = "Restarted"; + log.error( + "unexpected number of partitions for topic {}: expected={}, found={}", + topic, + numPartitions, + foundNumPartitions + ); + return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions; } + consumer.subscribe(List.of(topic), this); + running = true; - future = executorService.submit(this); + future = CompletableFuture.runAsync(this); - log.info("started"); - return result; + log.info("consumer started"); + return "Started"; } @PostMapping("stop") public synchronized String stop() { - if (!(running || restoring)) + if (!running) { - log.info("not running!"); + log.info("consumer not running!"); return "Not running"; } @@ -237,9 +415,10 @@ public class TransferConsumer implements Runnable finally { future = null; + consumer.unsubscribe(); } - log.info("stopped"); + log.info("consumer stopped"); return "Stopped"; } @@ -253,6 +432,7 @@ public class TransferConsumer implements Runnable } + public interface ConsumerUseCases extends GetTransferUseCase,