X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferConsumer.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferConsumer.java;h=501bfd0593a133bb3d3abeb7cc394f1710ac038c;hp=1fd26894908f5767a32872e5742b4b2bdaade4e9;hb=43ea59755f9673864a3ef95250009f091e99a760;hpb=cbfe4b796266ff7b9689fb69c5a8efee8ebb130a diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 1fd2689..501bfd0 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -5,8 +5,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase; import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase; -import lombok.RequiredArgsConstructor; +import de.juplo.kafka.payment.transfer.ports.TransferRepository; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -19,8 +22,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.time.Duration; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -29,20 +31,62 @@ import java.util.stream.Collectors; @RequestMapping("/consumer") @ResponseBody -@RequiredArgsConstructor @Slf4j -public class TransferConsumer implements Runnable +public class TransferConsumer implements Runnable, ConsumerRebalanceListener { private final String topic; + private final int numPartitions; private final KafkaConsumer consumer; + private final AdminClient adminClient; + private final TransferRepository repository; private final ObjectMapper mapper; private final ConsumerUseCases productionUseCases, restoreUseCases; - private boolean restoring = true; private boolean running = false; private boolean shutdown = false; private Future future = null; + private final String groupId; + private final String groupInstanceId; + private final Map instanceIdUriMapping; + private final String[] instanceIdByPartition; + + private volatile boolean partitionOwnershipUnknown = true; + + + public TransferConsumer( + String topic, + int numPartitions, + Map instanceIdUriMapping, + KafkaConsumer consumer, + AdminClient adminClient, + TransferRepository repository, + ObjectMapper mapper, + ConsumerUseCases productionUseCases, + ConsumerUseCases restoreUseCases) + { + this.topic = topic; + this.numPartitions = numPartitions; + this.groupId = consumer.groupMetadata().groupId(); + this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get(); + this.instanceIdByPartition = new String[numPartitions]; + this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size()); + for (String instanceId : instanceIdUriMapping.keySet()) + { + // Requests are not redirected for the instance itself + String uri = instanceId.equals(groupInstanceId) + ? null + : instanceIdUriMapping.get(instanceId); + this.instanceIdUriMapping.put(instanceId, uri); + } + this.consumer = consumer; + this.adminClient = adminClient; + this.repository = repository; + this.mapper = mapper; + this.productionUseCases = productionUseCases; + this.restoreUseCases = restoreUseCases; + } + @Override public void run() @@ -116,27 +160,98 @@ public class TransferConsumer implements Runnable } } + + public Optional uriForKey(String key) + { + synchronized (this) + { + while (partitionOwnershipUnknown) + { + try { wait(); } catch (InterruptedException e) {} + } + + int partition = TransferPartitioner.computeHashForKey(key, numPartitions); + return + Optional + .ofNullable(instanceIdByPartition[partition]) + .map(id -> instanceIdUriMapping.get(id)); + } + } + @EventListener public synchronized void onApplicationEvent(ContextRefreshedEvent event) { - // Needed, because this method is called synchronously during the - // initialization pahse of Spring. If the restoring is processed + // "Needed", because this method is called synchronously during the + // initialization pahse of Spring. If the subscription happens // in the same thread, it would block the completion of the initialization. // Hence, the app would not react to any signal (CTRL-C, for example) except // a KILL until the restoring is finished. - future = CompletableFuture.runAsync(() -> restore()); + future = CompletableFuture.runAsync(() -> start()); + } + + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitionOwnershipUnknown = true; + log.info("partitions revoked: {}", partitions); } - private void restore() + @Override + public void onPartitionsAssigned(Collection partitions) + { + log.info("partitions assigned: {}", partitions); + fetchAssignmentsAsync(); + if (partitions.size() > 0) + restore(partitions); + } + + private void fetchAssignmentsAsync() + { + adminClient + .describeConsumerGroups(List.of(groupId)) + .describedGroups() + .get(groupId) + .whenComplete((descriptions, e) -> + { + if (e != null) + { + log.error("could not fetch group data: {}", e.getMessage()); + } + else + { + synchronized (this) + { + for (MemberDescription description : descriptions.members()) + { + description + .assignment() + .topicPartitions() + .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get()); + } + partitionOwnershipUnknown = false; + notifyAll(); + } + } + }); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + partitionOwnershipUnknown = true; + log.info("partiotions lost: {}", partitions); + } + + + private void restore(Collection partitions) { log.info("--> starting restore..."); - List partitions = - consumer - .partitionsFor(topic) - .stream() - .map(info -> new TopicPartition(topic, info.partition())) - .collect(Collectors.toList()); + partitions + .stream() + .map(topicPartition -> topicPartition.partition()) + .forEach(partition -> repository.resetStorageForPartition(partition)); Map lastSeen = consumer @@ -155,11 +270,7 @@ public class TransferConsumer implements Runnable partition -> partition, partition -> 0l)); - log.info("assigning {}}", partitions); - consumer.assign(partitions); - while ( - restoring && positions .entrySet() .stream() @@ -182,47 +293,46 @@ public class TransferConsumer implements Runnable catch(WakeupException e) { log.info("--> cleanly interrupted while restoring"); - return; } } log.info("--> restore completed!"); - restoring = false; - - // We are intentionally _not_ unsubscribing here, since that would - // reset the offset to _earliest_, because we disabled offset-commits. - - start(); } @PostMapping("start") public synchronized String start() { - if (restoring) + if (running) { - log.error("cannot start while restoring"); - return "Denied: Restoring!"; + log.info("already running!"); + return "Already running!"; } - String result = "Started"; - - if (running) + int foundNumPartitions = consumer.partitionsFor(topic).size(); + if (foundNumPartitions != numPartitions) { - stop(); - result = "Restarted"; + log.error( + "unexpected number of partitions for topic {}: expected={}, found={}", + topic, + numPartitions, + foundNumPartitions + ); + return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions; } + consumer.subscribe(List.of(topic), this); + running = true; future = CompletableFuture.runAsync(this); log.info("started"); - return result; + return "Started"; } @PostMapping("stop") public synchronized String stop() { - if (!(running || restoring)) + if (!running) { log.info("not running!"); return "Not running"; @@ -246,6 +356,7 @@ public class TransferConsumer implements Runnable finally { future = null; + consumer.unsubscribe(); } log.info("stopped"); @@ -262,6 +373,7 @@ public class TransferConsumer implements Runnable } + public interface ConsumerUseCases extends GetTransferUseCase,