X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferConsumer.java;h=251588d2ea0e6c9fb826265130c856f68685cc12;hp=24f3e8893f259ba03d3d9f0823914804d070eefb;hb=edc88d6eac8c502ab0297380489ccc9ba706b5f0;hpb=26809d379a0e024017f70db8c70382f94faf98b6 diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 24f3e88..251588d 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -8,18 +8,24 @@ import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.time.Duration; -import java.util.Set; +import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.stream.Collectors; import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED; @@ -34,11 +40,11 @@ public class TransferConsumer implements Runnable private final KafkaConsumer consumer; private final ExecutorService executorService; private final ObjectMapper mapper; - private final GetTransferUseCase getTransferUseCase; - private final CreateTransferUseCase createTransferUseCase; - private final HandleStateChangeUseCase handleStateChangeUseCase; + private final ConsumerUseCases productionUseCases, restoreUseCases; + private boolean restoring = true; private boolean running = false; + private boolean shutdown = false; private Future future = null; @@ -50,60 +56,147 @@ public class TransferConsumer implements Runnable try { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - if (records.count() > 0) - log.debug("polled {} records", records.count()); + if (records.count() == 0) + continue; - records.forEach(record -> - { - try - { - byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0]; - - switch (eventType) - { - case EventType.NEW_TRANSFER: - - NewTransferEvent newTransferEvent = - mapper.readValue(record.value(), NewTransferEvent.class); - createTransferUseCase.create(newTransferEvent.toTransfer().setState(CREATED)); - break; - - case EventType.TRANSFER_STATE_CHANGED: - - TransferStateChangedEvent stateChangedEvent = - mapper.readValue(record.value(), TransferStateChangedEvent.class); - getTransferUseCase - .get(stateChangedEvent.getId()) - .ifPresentOrElse( - transfer -> handleStateChangeUseCase.handle(transfer.setState(stateChangedEvent.getState())), - () -> log.error("unknown transfer: {}", stateChangedEvent.getId())); - break; - } - } - catch (JsonProcessingException e) - { - log.error( - "ignoring invalid json in message #{} on {}/{}: {}", - record.offset(), - record.topic(), - record.partition(), - record.value()); - } - }); + log.debug("polled {} records", records.count()); + records.forEach(record -> handleRecord(record, productionUseCases)); } catch (WakeupException e) { - log.info("polling aborted!"); + log.info("cleanly interrupted while polling"); } } log.info("polling stopped"); } + private void handleRecord(ConsumerRecord record, ConsumerUseCases useCases) + { + try + { + byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0]; + + switch (eventType) + { + case EventType.NEW_TRANSFER: + + NewTransferEvent newTransferEvent = + mapper.readValue(record.value(), NewTransferEvent.class); + useCases.create(newTransferEvent.toTransfer().setState(CREATED)); + break; + + case EventType.TRANSFER_STATE_CHANGED: + + TransferStateChangedEvent stateChangedEvent = + mapper.readValue(record.value(), TransferStateChangedEvent.class); + useCases + .get(stateChangedEvent.getId()) + .ifPresentOrElse( + transfer -> useCases.handle(transfer.setState(stateChangedEvent.getState())), + () -> log.error("unknown transfer: {}", stateChangedEvent.getId())); + break; + } + } + catch (JsonProcessingException e) + { + log.error( + "ignoring invalid json in message #{} on {}/{}: {}", + record.offset(), + record.topic(), + record.partition(), + record.value()); + } + } + + @EventListener + public synchronized void onApplicationEvent(ContextRefreshedEvent event) + { + // Needed, because this method is called synchronously during the + // initialization pahse of Spring. If the restoring is processed + // in the same thread, it would block the completion of the initialization. + // Hence, the app would not react to any signal (CTRL-C, for example) except + // a KILL until the restoring is finished. + future = executorService.submit(() -> restore()); + } + + private void restore() + { + log.info("--> starting restore..."); + + List partitions = + consumer + .partitionsFor(topic) + .stream() + .map(info -> new TopicPartition(topic, info.partition())) + .collect(Collectors.toList()); + + Map lastSeen = + consumer + .endOffsets(partitions) + .entrySet() + .stream() + .collect(Collectors.toMap( + entry -> entry.getKey().partition(), + entry -> entry.getValue() - 1)); + + Map positions = + lastSeen + .keySet() + .stream() + .collect(Collectors.toMap( + partition -> partition, + partition -> 0l)); + + log.info("assigning {}}", partitions); + consumer.assign(partitions); + + while ( + restoring && + positions + .entrySet() + .stream() + .map(entry -> entry.getValue() < lastSeen.get(entry.getKey())) + .reduce(false, (a, b) -> a || b)) + { + try + { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + if (records.count() == 0) + continue; + + log.debug("polled {} records", records.count()); + records.forEach(record -> + { + handleRecord(record, restoreUseCases); + positions.put(record.partition(), record.offset()); + }); + } + catch(WakeupException e) + { + log.info("--> cleanly interrupted while restoring"); + return; + } + } + + log.info("--> restore completed!"); + restoring = false; + + // We are intentionally _not_ unsubscribing here, since that would + // reset the offset to _earliest_, because we disabled offset-commits. + + start(); + } @PostMapping("start") public synchronized String start() { + if (restoring) + { + log.error("cannot start while restoring"); + return "Denied: Restoring!"; + } + String result = "Started"; if (running) @@ -112,27 +205,28 @@ public class TransferConsumer implements Runnable result = "Restarted"; } - log.info("subscribing to topic {}", topic); - consumer.subscribe(Set.of(topic)); running = true; future = executorService.submit(this); + log.info("started"); return result; } @PostMapping("stop") public synchronized String stop() { - if (!running) + if (!(running || restoring)) { log.info("not running!"); return "Not running"; } running = false; + if (!future.isDone()) consumer.wakeup(); - log.info("waiting for the polling-loop to finish..."); + + log.info("waiting for the consumer..."); try { future.get(); @@ -145,18 +239,25 @@ public class TransferConsumer implements Runnable finally { future = null; - log.info("unsubscribing"); - consumer.unsubscribe(); } - return "Stoped"; + log.info("stopped"); + return "Stopped"; } public synchronized void shutdown() { log.info("shutdown initiated!"); + shutdown = true; stop(); log.info("closing consumer"); consumer.close(); } + + + public interface ConsumerUseCases + extends + GetTransferUseCase, + CreateTransferUseCase, + HandleStateChangeUseCase {}; }