X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferConsumer.java;h=920d79ae480b6bafdc889839fda7002abffb9ec5;hp=24f3e8893f259ba03d3d9f0823914804d070eefb;hb=19aec49a7f3a46f55e696a5a930c48883c4f1cd2;hpb=26809d379a0e024017f70db8c70382f94faf98b6 diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 24f3e88..920d79a 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -6,118 +6,386 @@ import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase; import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase; -import lombok.RequiredArgsConstructor; +import de.juplo.kafka.payment.transfer.ports.TransferRepository; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; +import java.time.Clock; import java.time.Duration; -import java.util.Set; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; - -import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED; +import java.util.stream.Collectors; @RequestMapping("/consumer") @ResponseBody -@RequiredArgsConstructor @Slf4j -public class TransferConsumer implements Runnable +public class TransferConsumer implements Runnable, ConsumerRebalanceListener { private final String topic; + private final int numPartitions; private final KafkaConsumer consumer; - private final ExecutorService executorService; + private final AdminClient adminClient; + private final TransferRepository repository; private final ObjectMapper mapper; - private final GetTransferUseCase getTransferUseCase; - private final CreateTransferUseCase createTransferUseCase; - private final HandleStateChangeUseCase handleStateChangeUseCase; + private final ConsumerUseCases productionUseCases, restoreUseCases; private boolean running = false; + private boolean shutdown = false; private Future future = null; + private final String groupId; + private final String groupInstanceId; + private final Map instanceIdUriMapping; + private final String[] instanceIdByPartition; + + private Clock clock; + private int stateStoreInterval; + + private volatile boolean partitionOwnershipUnknown = true; + + + public TransferConsumer( + String topic, + int numPartitions, + Map instanceIdUriMapping, + KafkaConsumer consumer, + AdminClient adminClient, + TransferRepository repository, + Clock clock, + int stateStoreInterval, + ObjectMapper mapper, + ConsumerUseCases productionUseCases, + ConsumerUseCases restoreUseCases) + { + this.topic = topic; + this.numPartitions = numPartitions; + this.groupId = consumer.groupMetadata().groupId(); + this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get(); + this.instanceIdByPartition = new String[numPartitions]; + this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size()); + for (String instanceId : instanceIdUriMapping.keySet()) + { + // Requests are not redirected for the instance itself + String uri = instanceId.equals(groupInstanceId) + ? null + : instanceIdUriMapping.get(instanceId); + this.instanceIdUriMapping.put(instanceId, uri); + } + this.consumer = consumer; + this.adminClient = adminClient; + this.repository = repository; + this.clock = clock; + this.stateStoreInterval = stateStoreInterval; + this.mapper = mapper; + this.productionUseCases = productionUseCases; + this.restoreUseCases = restoreUseCases; + } + @Override public void run() { + Instant stateStored = clock.instant(); + while (running) { try { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - if (records.count() > 0) - log.debug("polled {} records", records.count()); + if (records.count() == 0) + continue; - records.forEach(record -> + log.debug("polled {} records", records.count()); + records.forEach(record -> handleRecord(record, productionUseCases)); + + Instant now = clock.instant(); + if ( + stateStoreInterval > 0 && + Duration.between(stateStored, now).getSeconds() >= stateStoreInterval) { - try - { - byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0]; + Map offsets = new HashMap<>(); - switch (eventType) - { - case EventType.NEW_TRANSFER: - - NewTransferEvent newTransferEvent = - mapper.readValue(record.value(), NewTransferEvent.class); - createTransferUseCase.create(newTransferEvent.toTransfer().setState(CREATED)); - break; - - case EventType.TRANSFER_STATE_CHANGED: - - TransferStateChangedEvent stateChangedEvent = - mapper.readValue(record.value(), TransferStateChangedEvent.class); - getTransferUseCase - .get(stateChangedEvent.getId()) - .ifPresentOrElse( - transfer -> handleStateChangeUseCase.handle(transfer.setState(stateChangedEvent.getState())), - () -> log.error("unknown transfer: {}", stateChangedEvent.getId())); - break; - } - } - catch (JsonProcessingException e) + for (TopicPartition topicPartition : consumer.assignment()) { - log.error( - "ignoring invalid json in message #{} on {}/{}: {}", - record.offset(), - record.topic(), - record.partition(), - record.value()); + Integer partition = topicPartition.partition(); + Long offset = consumer.position(topicPartition); + log.info("storing state locally for {}/{}: {}", topic, partition, offset); + offsets.put(partition, offset); } - }); + + repository.storeState(offsets); + stateStored = now; + } } catch (WakeupException e) { - log.info("polling aborted!"); + log.info("cleanly interrupted while polling"); } } log.info("polling stopped"); } + private void handleRecord(ConsumerRecord record, ConsumerUseCases useCases) + { + try + { + byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0]; + + switch (eventType) + { + case EventType.NEW_TRANSFER: + + NewTransferEvent newTransferEvent = + mapper.readValue(record.value(), NewTransferEvent.class); + useCases + .create( + newTransferEvent.getId(), + newTransferEvent.getPayer(), + newTransferEvent.getPayee(), + newTransferEvent.getAmount()); + break; + + case EventType.TRANSFER_STATE_CHANGED: + + TransferStateChangedEvent stateChangedEvent = + mapper.readValue(record.value(), TransferStateChangedEvent.class); + useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState()); + break; + } + } + catch (JsonProcessingException e) + { + log.error( + "ignoring invalid json in message #{} on {}/{}: {}", + record.offset(), + record.topic(), + record.partition(), + record.value()); + } + catch (IllegalArgumentException e) + { + log.error( + "ignoring invalid message #{} on {}/{}: {}, message={}", + record.offset(), + record.topic(), + record.partition(), + e.getMessage(), + record.value()); + } + } + + + /** + * Identifies the URI, at which the Group-Instance can be reached, + * that holds the state for a specific {@link Transfer}. + * + * The {@link Transfer#getId() ID} of the {@link Transfer} is named + * {@code key} here and of type {@code String}, because this example + * project stores the key as a String in Kafka to simplify the listing + * and manual manipulation of the according topic. + * + * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}. + * @return An {@link Optional}, that holds the URI at which the Group-Instance + * can be reached, that holds the state for the {@link Transfer}, that + * is identified by the key (if present), or is empty, if the {@link Transfer} + * would be handled by the local instance. + */ + public Optional uriForKey(String key) + { + synchronized (this) + { + while (partitionOwnershipUnknown) + { + try { wait(); } catch (InterruptedException e) {} + } + + int partition = TransferPartitioner.computeHashForKey(key, numPartitions); + return + Optional + .ofNullable(instanceIdByPartition[partition]) + .map(id -> instanceIdUriMapping.get(id)); + } + } + + @EventListener + public synchronized void onApplicationEvent(ContextRefreshedEvent event) + { + // "Needed", because this method is called synchronously during the + // initialization pahse of Spring. If the subscription happens + // in the same thread, it would block the completion of the initialization. + // Hence, the app would not react to any signal (CTRL-C, for example) except + // a KILL until the restoring is finished. + future = CompletableFuture.runAsync(() -> start()); + log.info("start of application completed"); + } + + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitionOwnershipUnknown = true; + log.info("partitions revoked: {}", partitions); + for (TopicPartition topicPartition : partitions) + { + int partition = topicPartition.partition(); + long offset = consumer.position(topicPartition); + log.info("deactivating partition {}, offset: {}", partition, offset); + repository.deactivatePartition(partition, offset); + } + } + + @Override + public void onPartitionsAssigned(Collection partitions) + { + log.info("partitions assigned: {}", partitions); + fetchAssignmentsAsync(); + if (partitions.size() > 0) + { + for (TopicPartition topicPartition : partitions) + { + int partition = topicPartition.partition(); + long offset = repository.activatePartition(partition); + log.info("activated partition {}, seeking to offset {}", partition, offset); + consumer.seek(topicPartition, offset); + } + + restore(partitions); + } + } + + private void fetchAssignmentsAsync() + { + adminClient + .describeConsumerGroups(List.of(groupId)) + .describedGroups() + .get(groupId) + .whenComplete((descriptions, e) -> + { + if (e != null) + { + log.error("could not fetch group data: {}", e.getMessage()); + } + else + { + synchronized (this) + { + for (MemberDescription description : descriptions.members()) + { + description + .assignment() + .topicPartitions() + .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get()); + } + partitionOwnershipUnknown = false; + notifyAll(); + } + } + }); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + partitionOwnershipUnknown = true; + log.info("partiotions lost: {}", partitions); + } + + + private void restore(Collection partitions) + { + log.info("--> starting restore..."); + + Map lastSeen = + consumer + .endOffsets(partitions) + .entrySet() + .stream() + .collect(Collectors.toMap( + entry -> entry.getKey().partition(), + entry -> entry.getValue() - 1)); + + Map positions = + lastSeen + .keySet() + .stream() + .collect(Collectors.toMap( + partition -> partition, + partition -> repository.storedPosition(partition))); + + while ( + positions + .entrySet() + .stream() + .map(entry -> entry.getValue() < lastSeen.get(entry.getKey())) + .reduce(false, (a, b) -> a || b)) + { + try + { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + if (records.count() == 0) + continue; + + log.debug("polled {} records", records.count()); + records.forEach(record -> + { + handleRecord(record, restoreUseCases); + positions.put(record.partition(), record.offset()); + }); + } + catch(WakeupException e) + { + log.info("--> cleanly interrupted while restoring"); + } + } + + log.info("--> restore completed!"); + } @PostMapping("start") public synchronized String start() { - String result = "Started"; - if (running) { - stop(); - result = "Restarted"; + log.info("consumer already running!"); + return "Already running!"; + } + + int foundNumPartitions = consumer.partitionsFor(topic).size(); + if (foundNumPartitions != numPartitions) + { + log.error( + "unexpected number of partitions for topic {}: expected={}, found={}", + topic, + numPartitions, + foundNumPartitions + ); + return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions; } - log.info("subscribing to topic {}", topic); - consumer.subscribe(Set.of(topic)); + consumer.subscribe(List.of(topic), this); + running = true; - future = executorService.submit(this); + future = CompletableFuture.runAsync(this); - return result; + log.info("consumer started"); + return "Started"; } @PostMapping("stop") @@ -125,14 +393,16 @@ public class TransferConsumer implements Runnable { if (!running) { - log.info("not running!"); + log.info("consumer not running!"); return "Not running"; } running = false; + if (!future.isDone()) consumer.wakeup(); - log.info("waiting for the polling-loop to finish..."); + + log.info("waiting for the consumer..."); try { future.get(); @@ -145,18 +415,27 @@ public class TransferConsumer implements Runnable finally { future = null; - log.info("unsubscribing"); consumer.unsubscribe(); } - return "Stoped"; + log.info("consumer stopped"); + return "Stopped"; } public synchronized void shutdown() { log.info("shutdown initiated!"); + shutdown = true; stop(); log.info("closing consumer"); consumer.close(); } + + + + public interface ConsumerUseCases + extends + GetTransferUseCase, + CreateTransferUseCase, + HandleStateChangeUseCase {}; }