X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferConsumer.java;h=1cae540ff24f5a2d7d2dea4f10e182060cd871c5;hp=251588d2ea0e6c9fb826265130c856f68685cc12;hb=9b5a2fb9d42baeb4ddde2ac146e2f8e61a1fc550;hpb=edc88d6eac8c502ab0297380489ccc9ba706b5f0 diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 251588d..1cae540 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -6,8 +6,11 @@ import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase; import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase; -import lombok.RequiredArgsConstructor; +import de.juplo.kafka.payment.transfer.ports.TransferRepository; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -19,38 +22,93 @@ import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; +import java.time.Clock; import java.time.Duration; -import java.util.List; -import java.util.Map; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.stream.Collectors; - -import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED; +import java.util.function.Consumer; @RequestMapping("/consumer") @ResponseBody -@RequiredArgsConstructor @Slf4j -public class TransferConsumer implements Runnable +public class TransferConsumer implements Runnable, ConsumerRebalanceListener { private final String topic; + private final int numPartitions; private final KafkaConsumer consumer; - private final ExecutorService executorService; + private final AdminClient adminClient; + private final TransferRepository repository; private final ObjectMapper mapper; - private final ConsumerUseCases productionUseCases, restoreUseCases; + private final ConsumerUseCases restoreUseCases; - private boolean restoring = true; private boolean running = false; private boolean shutdown = false; private Future future = null; + private final String groupId; + private final String groupInstanceId; + private final Map instanceIdUriMapping; + private final String[] instanceIdByPartition; + + + private Clock clock; + private int stateStoreInterval; + + private final Consumer> productionRecordHandler; + private final Consumer> recordHandlers[]; + + private volatile boolean partitionOwnershipUnknown = true; + + + public TransferConsumer( + String topic, + int numPartitions, + Map instanceIdUriMapping, + KafkaConsumer consumer, + AdminClient adminClient, + TransferRepository repository, + Clock clock, + int stateStoreInterval, + ObjectMapper mapper, + ConsumerUseCases productionUseCases, + ConsumerUseCases restoreUseCases) + { + this.topic = topic; + this.numPartitions = numPartitions; + this.groupId = consumer.groupMetadata().groupId(); + this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get(); + this.instanceIdByPartition = new String[numPartitions]; + this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size()); + for (String instanceId : instanceIdUriMapping.keySet()) + { + // Requests are not redirected for the instance itself + String uri = instanceId.equals(groupInstanceId) + ? null + : instanceIdUriMapping.get(instanceId); + this.instanceIdUriMapping.put(instanceId, uri); + } + this.consumer = consumer; + this.adminClient = adminClient; + this.repository = repository; + this.clock = clock; + this.stateStoreInterval = stateStoreInterval; + this.mapper = mapper; + this.restoreUseCases = restoreUseCases; + + productionRecordHandler = (record) -> handleRecord(record, productionUseCases); + this.recordHandlers = new Consumer[numPartitions]; + } + @Override public void run() { + Instant stateStored = clock.instant(); + while (running) { try @@ -60,7 +118,26 @@ public class TransferConsumer implements Runnable continue; log.debug("polled {} records", records.count()); - records.forEach(record -> handleRecord(record, productionUseCases)); + records.forEach(record -> recordHandlers[record.partition()].accept(record)); + + Instant now = clock.instant(); + if ( + stateStoreInterval > 0 && + Duration.between(stateStored, now).getSeconds() >= stateStoreInterval) + { + Map offsets = new HashMap<>(); + + for (TopicPartition topicPartition : consumer.assignment()) + { + Integer partition = topicPartition.partition(); + Long offset = consumer.position(topicPartition); + log.info("storing state locally for {}/{}: {}", topic, partition, offset); + offsets.put(partition, offset); + } + + repository.storeState(offsets); + stateStored = now; + } } catch (WakeupException e) { @@ -83,18 +160,19 @@ public class TransferConsumer implements Runnable NewTransferEvent newTransferEvent = mapper.readValue(record.value(), NewTransferEvent.class); - useCases.create(newTransferEvent.toTransfer().setState(CREATED)); + useCases + .create( + newTransferEvent.getId(), + newTransferEvent.getPayer(), + newTransferEvent.getPayee(), + newTransferEvent.getAmount()); break; case EventType.TRANSFER_STATE_CHANGED: TransferStateChangedEvent stateChangedEvent = mapper.readValue(record.value(), TransferStateChangedEvent.class); - useCases - .get(stateChangedEvent.getId()) - .ifPresentOrElse( - transfer -> useCases.handle(transfer.setState(stateChangedEvent.getState())), - () -> log.error("unknown transfer: {}", stateChangedEvent.getId())); + useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState()); break; } } @@ -107,117 +185,220 @@ public class TransferConsumer implements Runnable record.partition(), record.value()); } + catch (IllegalArgumentException e) + { + log.error( + "ignoring invalid message #{} on {}/{}: {}, message={}", + record.offset(), + record.topic(), + record.partition(), + e.getMessage(), + record.value()); + } + } + + + /** + * Identifies the URI, at which the Group-Instance can be reached, + * that holds the state for a specific {@link Transfer}. + * + * The {@link Transfer#getId() ID} of the {@link Transfer} is named + * {@code key} here and of type {@code String}, because this example + * project stores the key as a String in Kafka to simplify the listing + * and manual manipulation of the according topic. + * + * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}. + * @return An {@link Optional}, that holds the URI at which the Group-Instance + * can be reached, that holds the state for the {@link Transfer}, that + * is identified by the key (if present), or is empty, if the {@link Transfer} + * would be handled by the local instance. + */ + public Optional uriForKey(String key) + { + synchronized (this) + { + while (partitionOwnershipUnknown) + { + try { wait(); } catch (InterruptedException e) {} + } + + int partition = TransferPartitioner.computeHashForKey(key, numPartitions); + return + Optional + .ofNullable(instanceIdByPartition[partition]) + .map(id -> instanceIdUriMapping.get(id)); + } } @EventListener public synchronized void onApplicationEvent(ContextRefreshedEvent event) { - // Needed, because this method is called synchronously during the - // initialization pahse of Spring. If the restoring is processed + // "Needed", because this method is called synchronously during the + // initialization pahse of Spring. If the subscription happens // in the same thread, it would block the completion of the initialization. // Hence, the app would not react to any signal (CTRL-C, for example) except // a KILL until the restoring is finished. - future = executorService.submit(() -> restore()); + future = CompletableFuture.runAsync(() -> start()); + log.info("start of application completed"); } - private void restore() + + @Override + public void onPartitionsRevoked(Collection partitions) { - log.info("--> starting restore..."); - - List partitions = - consumer - .partitionsFor(topic) - .stream() - .map(info -> new TopicPartition(topic, info.partition())) - .collect(Collectors.toList()); - - Map lastSeen = - consumer - .endOffsets(partitions) - .entrySet() - .stream() - .collect(Collectors.toMap( - entry -> entry.getKey().partition(), - entry -> entry.getValue() - 1)); - - Map positions = - lastSeen - .keySet() - .stream() - .collect(Collectors.toMap( - partition -> partition, - partition -> 0l)); - - log.info("assigning {}}", partitions); - consumer.assign(partitions); - - while ( - restoring && - positions - .entrySet() - .stream() - .map(entry -> entry.getValue() < lastSeen.get(entry.getKey())) - .reduce(false, (a, b) -> a || b)) + partitionOwnershipUnknown = true; + log.info("partitions revoked: {}", partitions); + for (TopicPartition topicPartition : partitions) { - try + int partition = topicPartition.partition(); + long offset = consumer.position(topicPartition); + log.info("deactivating partition {}, offset: {}", partition, offset); + repository.deactivatePartition(partition, offset); + } + } + + @Override + public void onPartitionsAssigned(Collection partitions) + { + log.info("partitions assigned: {}", partitions); + fetchAssignmentsAsync(); + if (partitions.size() > 0) + { + for (Map.Entry entry : consumer.endOffsets(partitions).entrySet()) { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - if (records.count() == 0) - continue; + TopicPartition topicPartition = entry.getKey(); + Integer partition = topicPartition.partition(); + long offset = repository.activatePartition(partition); + log.info("activated partition {}, seeking to offset {}", partition, offset); + consumer.seek(topicPartition, offset); + Long endOffset = entry.getValue(); + if (offset < endOffset) + { + log.info("--> starting restore of partition {}: {} -> {}", partition, offset, endOffset); + recordHandlers[partition] = new RestoreRecordHandler(endOffset); + } + else + { + log.info("--> partition {} is up-to-date, offset: {}", partition, offset); + recordHandlers[partition] = productionRecordHandler; + } + } + } + } - log.debug("polled {} records", records.count()); - records.forEach(record -> + private void fetchAssignmentsAsync() + { + adminClient + .describeConsumerGroups(List.of(groupId)) + .describedGroups() + .get(groupId) + .whenComplete((descriptions, e) -> { - handleRecord(record, restoreUseCases); - positions.put(record.partition(), record.offset()); + if (e != null) + { + log.error("could not fetch group data: {}", e.getMessage()); + } + else + { + synchronized (this) + { + for (MemberDescription description : descriptions.members()) + { + description + .assignment() + .topicPartitions() + .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get()); + } + partitionOwnershipUnknown = false; + notifyAll(); + } + } }); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + partitionOwnershipUnknown = true; + log.info("partiotions lost: {}", partitions); + } + + + class RestoreRecordHandler implements Consumer> + { + final long seen; + + + RestoreRecordHandler(Long endOffset) + { + this.seen = endOffset - 1; + } + + + @Override + public void accept(ConsumerRecord record) + { + if (seen < record.offset()) + { + int partition = record.partition(); + log.info( + "--> restore of partition {} completed: needed={}, seen={}!", + partition, + seen, + record.offset()); + recordHandlers[partition] = productionRecordHandler; + productionRecordHandler.accept(record); } - catch(WakeupException e) + else { - log.info("--> cleanly interrupted while restoring"); - return; + handleRecord(record, restoreUseCases); + if (seen == record.offset()) + { + int partition = record.partition(); + log.info( "--> restore of partition {} completed!", partition); + recordHandlers[partition] = productionRecordHandler; + } } } - - log.info("--> restore completed!"); - restoring = false; - - // We are intentionally _not_ unsubscribing here, since that would - // reset the offset to _earliest_, because we disabled offset-commits. - - start(); } + @PostMapping("start") public synchronized String start() { - if (restoring) + if (running) { - log.error("cannot start while restoring"); - return "Denied: Restoring!"; + log.info("consumer already running!"); + return "Already running!"; } - String result = "Started"; - - if (running) + int foundNumPartitions = consumer.partitionsFor(topic).size(); + if (foundNumPartitions != numPartitions) { - stop(); - result = "Restarted"; + log.error( + "unexpected number of partitions for topic {}: expected={}, found={}", + topic, + numPartitions, + foundNumPartitions + ); + return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions; } + consumer.subscribe(List.of(topic), this); + running = true; - future = executorService.submit(this); + future = CompletableFuture.runAsync(this); - log.info("started"); - return result; + log.info("consumer started"); + return "Started"; } @PostMapping("stop") public synchronized String stop() { - if (!(running || restoring)) + if (!running) { - log.info("not running!"); + log.info("consumer not running!"); return "Not running"; } @@ -239,9 +420,10 @@ public class TransferConsumer implements Runnable finally { future = null; + consumer.unsubscribe(); } - log.info("stopped"); + log.info("consumer stopped"); return "Stopped"; } @@ -255,6 +437,7 @@ public class TransferConsumer implements Runnable } + public interface ConsumerUseCases extends GetTransferUseCase,