X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferConsumer.java;h=2ef7ee3db25c1630103a5b991b1994ac032d8491;hp=17d91dea868b6d9f535984a0234882e27efcbde7;hb=c64f93de3e59af674885fdad08c521d82f4802d1;hpb=4467c5240397a47b181106a0ae902ed1b71d0c5d diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 17d91de..2ef7ee3 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -2,39 +2,91 @@ package de.juplo.kafka.payment.transfer.adapter; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.payment.transfer.domain.Transfer; -import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase; -import lombok.RequiredArgsConstructor; +import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase; +import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; +import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase; +import de.juplo.kafka.payment.transfer.ports.TransferRepository; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.time.Duration; -import java.util.Set; +import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.stream.Collectors; @RequestMapping("/consumer") @ResponseBody -@RequiredArgsConstructor @Slf4j -public class TransferConsumer implements Runnable +public class TransferConsumer implements Runnable, ConsumerRebalanceListener { private final String topic; + private final int numPartitions; private final KafkaConsumer consumer; - private final ExecutorService executorService; + private final AdminClient adminClient; + private final TransferRepository repository; private final ObjectMapper mapper; - private final HandleTransferUseCase handleTransferUseCase; + private final ConsumerUseCases productionUseCases, restoreUseCases; private boolean running = false; + private boolean shutdown = false; private Future future = null; + private final String groupId; + private final String groupInstanceId; + private final Map instanceIdUriMapping; + private final String[] instanceIdByPartition; + + private volatile boolean partitionOwnershipUnknown = true; + + + public TransferConsumer( + String topic, + int numPartitions, + Map instanceIdUriMapping, + KafkaConsumer consumer, + AdminClient adminClient, + TransferRepository repository, + ObjectMapper mapper, + ConsumerUseCases productionUseCases, + ConsumerUseCases restoreUseCases) + { + this.topic = topic; + this.numPartitions = numPartitions; + this.groupId = consumer.groupMetadata().groupId(); + this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get(); + this.instanceIdByPartition = new String[numPartitions]; + this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size()); + for (String instanceId : instanceIdUriMapping.keySet()) + { + // Requests are not redirected for the instance itself + String uri = instanceId.equals(groupInstanceId) + ? null + : instanceIdUriMapping.get(instanceId); + this.instanceIdUriMapping.put(instanceId, uri); + } + this.consumer = consumer; + this.adminClient = adminClient; + this.repository = repository; + this.mapper = mapper; + this.productionUseCases = productionUseCases; + this.restoreUseCases = restoreUseCases; + } + @Override public void run() @@ -44,53 +96,238 @@ public class TransferConsumer implements Runnable try { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + if (records.count() == 0) + continue; + log.debug("polled {} records", records.count()); + records.forEach(record -> handleRecord(record, productionUseCases)); + } + catch (WakeupException e) + { + log.info("cleanly interrupted while polling"); + } + } - records.forEach(record -> + log.info("polling stopped"); + } + + private void handleRecord(ConsumerRecord record, ConsumerUseCases useCases) + { + try + { + byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0]; + + switch (eventType) + { + case EventType.NEW_TRANSFER: + + NewTransferEvent newTransferEvent = + mapper.readValue(record.value(), NewTransferEvent.class); + useCases + .create( + newTransferEvent.getId(), + newTransferEvent.getPayer(), + newTransferEvent.getPayee(), + newTransferEvent.getAmount()); + break; + + case EventType.TRANSFER_STATE_CHANGED: + + TransferStateChangedEvent stateChangedEvent = + mapper.readValue(record.value(), TransferStateChangedEvent.class); + useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState()); + break; + } + } + catch (JsonProcessingException e) + { + log.error( + "ignoring invalid json in message #{} on {}/{}: {}", + record.offset(), + record.topic(), + record.partition(), + record.value()); + } + catch (IllegalArgumentException e) + { + log.error( + "ignoring invalid message #{} on {}/{}: {}, message={}", + record.offset(), + record.topic(), + record.partition(), + e.getMessage(), + record.value()); + } + } + + + public Optional uriForKey(String key) + { + synchronized (this) + { + while (partitionOwnershipUnknown) + { + try { wait(); } catch (InterruptedException e) {} + } + + int partition = TransferPartitioner.computeHashForKey(key, numPartitions); + return + Optional + .ofNullable(instanceIdByPartition[partition]) + .map(id -> instanceIdUriMapping.get(id)); + } + } + + @EventListener + public synchronized void onApplicationEvent(ContextRefreshedEvent event) + { + // "Needed", because this method is called synchronously during the + // initialization pahse of Spring. If the subscription happens + // in the same thread, it would block the completion of the initialization. + // Hence, the app would not react to any signal (CTRL-C, for example) except + // a KILL until the restoring is finished. + future = CompletableFuture.runAsync(() -> start()); + log.info("start of application completed"); + } + + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitionOwnershipUnknown = true; + log.info("partitions revoked: {}", partitions); + } + + @Override + public void onPartitionsAssigned(Collection partitions) + { + log.info("partitions assigned: {}", partitions); + fetchAssignmentsAsync(); + if (partitions.size() > 0) + restore(partitions); + } + + private void fetchAssignmentsAsync() + { + adminClient + .describeConsumerGroups(List.of(groupId)) + .describedGroups() + .get(groupId) + .whenComplete((descriptions, e) -> { - try + if (e != null) { - Transfer transfer = mapper.readValue(record.value(), Transfer.class); - handleTransferUseCase.handle(transfer); + log.error("could not fetch group data: {}", e.getMessage()); } - catch (JsonProcessingException e) + else { - log.error( - "ignoring invalid json in message #{} on {}/{}: {}", - record.offset(), - record.topic(), - record.partition(), - record.value()); + synchronized (this) + { + for (MemberDescription description : descriptions.members()) + { + description + .assignment() + .topicPartitions() + .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get()); + } + partitionOwnershipUnknown = false; + notifyAll(); + } } }); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + partitionOwnershipUnknown = true; + log.info("partiotions lost: {}", partitions); + } + + + private void restore(Collection partitions) + { + log.info("--> starting restore..."); + + partitions + .stream() + .map(topicPartition -> topicPartition.partition()) + .forEach(partition -> repository.resetStorageForPartition(partition)); + + Map lastSeen = + consumer + .endOffsets(partitions) + .entrySet() + .stream() + .collect(Collectors.toMap( + entry -> entry.getKey().partition(), + entry -> entry.getValue() - 1)); + + Map positions = + lastSeen + .keySet() + .stream() + .collect(Collectors.toMap( + partition -> partition, + partition -> 0l)); + + while ( + positions + .entrySet() + .stream() + .map(entry -> entry.getValue() < lastSeen.get(entry.getKey())) + .reduce(false, (a, b) -> a || b)) + { + try + { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + if (records.count() == 0) + continue; + + log.debug("polled {} records", records.count()); + records.forEach(record -> + { + handleRecord(record, restoreUseCases); + positions.put(record.partition(), record.offset()); + }); } - catch (WakeupException e) + catch(WakeupException e) { - log.info("polling aborted!"); + log.info("--> cleanly interrupted while restoring"); } } - log.info("polling stopped"); + log.info("--> restore completed!"); } - @PostMapping("start") public synchronized String start() { - String result = "Started"; - if (running) { - stop(); - result = "Restarted"; + log.info("consumer already running!"); + return "Already running!"; + } + + int foundNumPartitions = consumer.partitionsFor(topic).size(); + if (foundNumPartitions != numPartitions) + { + log.error( + "unexpected number of partitions for topic {}: expected={}, found={}", + topic, + numPartitions, + foundNumPartitions + ); + return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions; } - log.info("subscribing to topic {}", topic); - consumer.subscribe(Set.of(topic)); + consumer.subscribe(List.of(topic), this); + running = true; - future = executorService.submit(this); + future = CompletableFuture.runAsync(this); - return result; + log.info("consumer started"); + return "Started"; } @PostMapping("stop") @@ -98,14 +335,16 @@ public class TransferConsumer implements Runnable { if (!running) { - log.info("not running!"); + log.info("consumer not running!"); return "Not running"; } running = false; + if (!future.isDone()) consumer.wakeup(); - log.info("waiting for the polling-loop to finish..."); + + log.info("waiting for the consumer..."); try { future.get(); @@ -118,18 +357,27 @@ public class TransferConsumer implements Runnable finally { future = null; - log.info("unsubscribing"); consumer.unsubscribe(); } - return "Stoped"; + log.info("consumer stopped"); + return "Stopped"; } public synchronized void shutdown() { log.info("shutdown initiated!"); + shutdown = true; stop(); log.info("closing consumer"); consumer.close(); } + + + + public interface ConsumerUseCases + extends + GetTransferUseCase, + CreateTransferUseCase, + HandleStateChangeUseCase {}; }