From: Kai Moritz Date: Tue, 15 Jun 2021 18:27:20 +0000 (+0200) Subject: Automatically rebuild the state after a crash / restart X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=edc88d6eac8c502ab0297380489ccc9ba706b5f0;p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer Automatically rebuild the state after a crash / restart * Turning of enable.auto.commit and setting auto.offset.reset to earliest does the trick in this setup. * Since no offset-commits are made, all events are re-read on startup. * The TransferConsumer uses a special instance of TransferService, that is initialized with a NoOpMessageService, to replay the restored events without generating new events. --- diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index 58a3af2..259b62d 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -3,9 +3,11 @@ package de.juplo.kafka.payment.transfer; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService; +import de.juplo.kafka.payment.transfer.adapter.NoOpMessageService; import de.juplo.kafka.payment.transfer.adapter.TransferConsumer; +import de.juplo.kafka.payment.transfer.adapter.TransferController; +import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.domain.TransferService; -import de.juplo.kafka.payment.transfer.ports.MessagingService; import de.juplo.kafka.payment.transfer.ports.TransferRepository; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -19,6 +21,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -46,6 +49,8 @@ public class TransferServiceApplication Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.groupId); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); @@ -64,23 +69,57 @@ public class TransferServiceApplication KafkaConsumer consumer, ExecutorService executorService, ObjectMapper mapper, - TransferService transferService) + TransferService productionTransferService, + TransferService restoreTransferService) { - TransferConsumer transferConsumer = + return new TransferConsumer( properties.topic, consumer, executorService, mapper, - transferService, - transferService, - transferService); - transferConsumer.start(); - return transferConsumer; + new TransferConsumer.ConsumerUseCases() { + @Override + public void create(Transfer transfer) + { + productionTransferService.create(transfer); + } + + @Override + public Optional get(Long id) + { + return productionTransferService.get(id); + } + + @Override + public void handle(Transfer transfer) + { + productionTransferService.handle(transfer); + } + }, + new TransferConsumer.ConsumerUseCases() { + @Override + public void create(Transfer transfer) + { + restoreTransferService.create(transfer); + } + + @Override + public Optional get(Long id) + { + return restoreTransferService.get(id); + } + + @Override + public void handle(Transfer transfer) + { + restoreTransferService.handle(transfer); + } + }); } @Bean - MessagingService kafkaMessagingService( + KafkaMessagingService kafkaMessagingService( KafkaProducer producer, ObjectMapper mapper, TransferServiceProperties properties) @@ -89,11 +128,27 @@ public class TransferServiceApplication } @Bean - TransferService transferService( + TransferService productionTransferService( TransferRepository repository, - MessagingService messagingService) + KafkaMessagingService kafkaMessagingService) + { + return new TransferService(repository, kafkaMessagingService); + } + + @Bean + TransferService restoreTransferService( + TransferRepository repository, + NoOpMessageService noOpMessageService) + { + return new TransferService(repository, noOpMessageService); + } + + @Bean + TransferController transferController( + TransferService productionTransferService, + KafkaMessagingService kafkaMessagingService) { - return new TransferService(repository, messagingService); + return new TransferController(productionTransferService, kafkaMessagingService); } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/NoOpMessageService.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/NoOpMessageService.java new file mode 100644 index 0000000..143abf1 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/NoOpMessageService.java @@ -0,0 +1,28 @@ +package de.juplo.kafka.payment.transfer.adapter; + +import de.juplo.kafka.payment.transfer.domain.Transfer; +import de.juplo.kafka.payment.transfer.ports.MessagingService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.concurrent.CompletableFuture; + + +@Component +@Slf4j +public class NoOpMessageService implements MessagingService +{ + @Override + public CompletableFuture send(Transfer transfer) + { + log.info("restoring transfer: {}", transfer); + return CompletableFuture.completedFuture(transfer.toString()); + } + + @Override + public CompletableFuture send(Long id, Transfer.State state) + { + log.info("restoring state-change for transfer {}: {}", id, state); + return CompletableFuture.completedFuture("transfer: " + id + " - " + state); + } +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 24f3e88..251588d 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -8,18 +8,24 @@ import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.time.Duration; -import java.util.Set; +import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.stream.Collectors; import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED; @@ -34,11 +40,11 @@ public class TransferConsumer implements Runnable private final KafkaConsumer consumer; private final ExecutorService executorService; private final ObjectMapper mapper; - private final GetTransferUseCase getTransferUseCase; - private final CreateTransferUseCase createTransferUseCase; - private final HandleStateChangeUseCase handleStateChangeUseCase; + private final ConsumerUseCases productionUseCases, restoreUseCases; + private boolean restoring = true; private boolean running = false; + private boolean shutdown = false; private Future future = null; @@ -50,60 +56,147 @@ public class TransferConsumer implements Runnable try { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - if (records.count() > 0) - log.debug("polled {} records", records.count()); + if (records.count() == 0) + continue; - records.forEach(record -> - { - try - { - byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0]; - - switch (eventType) - { - case EventType.NEW_TRANSFER: - - NewTransferEvent newTransferEvent = - mapper.readValue(record.value(), NewTransferEvent.class); - createTransferUseCase.create(newTransferEvent.toTransfer().setState(CREATED)); - break; - - case EventType.TRANSFER_STATE_CHANGED: - - TransferStateChangedEvent stateChangedEvent = - mapper.readValue(record.value(), TransferStateChangedEvent.class); - getTransferUseCase - .get(stateChangedEvent.getId()) - .ifPresentOrElse( - transfer -> handleStateChangeUseCase.handle(transfer.setState(stateChangedEvent.getState())), - () -> log.error("unknown transfer: {}", stateChangedEvent.getId())); - break; - } - } - catch (JsonProcessingException e) - { - log.error( - "ignoring invalid json in message #{} on {}/{}: {}", - record.offset(), - record.topic(), - record.partition(), - record.value()); - } - }); + log.debug("polled {} records", records.count()); + records.forEach(record -> handleRecord(record, productionUseCases)); } catch (WakeupException e) { - log.info("polling aborted!"); + log.info("cleanly interrupted while polling"); } } log.info("polling stopped"); } + private void handleRecord(ConsumerRecord record, ConsumerUseCases useCases) + { + try + { + byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0]; + + switch (eventType) + { + case EventType.NEW_TRANSFER: + + NewTransferEvent newTransferEvent = + mapper.readValue(record.value(), NewTransferEvent.class); + useCases.create(newTransferEvent.toTransfer().setState(CREATED)); + break; + + case EventType.TRANSFER_STATE_CHANGED: + + TransferStateChangedEvent stateChangedEvent = + mapper.readValue(record.value(), TransferStateChangedEvent.class); + useCases + .get(stateChangedEvent.getId()) + .ifPresentOrElse( + transfer -> useCases.handle(transfer.setState(stateChangedEvent.getState())), + () -> log.error("unknown transfer: {}", stateChangedEvent.getId())); + break; + } + } + catch (JsonProcessingException e) + { + log.error( + "ignoring invalid json in message #{} on {}/{}: {}", + record.offset(), + record.topic(), + record.partition(), + record.value()); + } + } + + @EventListener + public synchronized void onApplicationEvent(ContextRefreshedEvent event) + { + // Needed, because this method is called synchronously during the + // initialization pahse of Spring. If the restoring is processed + // in the same thread, it would block the completion of the initialization. + // Hence, the app would not react to any signal (CTRL-C, for example) except + // a KILL until the restoring is finished. + future = executorService.submit(() -> restore()); + } + + private void restore() + { + log.info("--> starting restore..."); + + List partitions = + consumer + .partitionsFor(topic) + .stream() + .map(info -> new TopicPartition(topic, info.partition())) + .collect(Collectors.toList()); + + Map lastSeen = + consumer + .endOffsets(partitions) + .entrySet() + .stream() + .collect(Collectors.toMap( + entry -> entry.getKey().partition(), + entry -> entry.getValue() - 1)); + + Map positions = + lastSeen + .keySet() + .stream() + .collect(Collectors.toMap( + partition -> partition, + partition -> 0l)); + + log.info("assigning {}}", partitions); + consumer.assign(partitions); + + while ( + restoring && + positions + .entrySet() + .stream() + .map(entry -> entry.getValue() < lastSeen.get(entry.getKey())) + .reduce(false, (a, b) -> a || b)) + { + try + { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + if (records.count() == 0) + continue; + + log.debug("polled {} records", records.count()); + records.forEach(record -> + { + handleRecord(record, restoreUseCases); + positions.put(record.partition(), record.offset()); + }); + } + catch(WakeupException e) + { + log.info("--> cleanly interrupted while restoring"); + return; + } + } + + log.info("--> restore completed!"); + restoring = false; + + // We are intentionally _not_ unsubscribing here, since that would + // reset the offset to _earliest_, because we disabled offset-commits. + + start(); + } @PostMapping("start") public synchronized String start() { + if (restoring) + { + log.error("cannot start while restoring"); + return "Denied: Restoring!"; + } + String result = "Started"; if (running) @@ -112,27 +205,28 @@ public class TransferConsumer implements Runnable result = "Restarted"; } - log.info("subscribing to topic {}", topic); - consumer.subscribe(Set.of(topic)); running = true; future = executorService.submit(this); + log.info("started"); return result; } @PostMapping("stop") public synchronized String stop() { - if (!running) + if (!(running || restoring)) { log.info("not running!"); return "Not running"; } running = false; + if (!future.isDone()) consumer.wakeup(); - log.info("waiting for the polling-loop to finish..."); + + log.info("waiting for the consumer..."); try { future.get(); @@ -145,18 +239,25 @@ public class TransferConsumer implements Runnable finally { future = null; - log.info("unsubscribing"); - consumer.unsubscribe(); } - return "Stoped"; + log.info("stopped"); + return "Stopped"; } public synchronized void shutdown() { log.info("shutdown initiated!"); + shutdown = true; stop(); log.info("closing consumer"); consumer.close(); } + + + public interface ConsumerUseCases + extends + GetTransferUseCase, + CreateTransferUseCase, + HandleStateChangeUseCase {}; } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java index b9a2fb0..5f30df6 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java @@ -24,7 +24,8 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; -@RestController +@RequestMapping(TransferController.PATH) +@ResponseBody @RequiredArgsConstructor @Slf4j public class TransferController @@ -36,7 +37,7 @@ import java.util.concurrent.CompletableFuture; @PostMapping( - path = PATH, + path = "", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) public DeferredResult> transfer( @@ -85,7 +86,7 @@ import java.util.concurrent.CompletableFuture; } @GetMapping( - path = PATH + "/{id}", + path = "/{id}", produces = MediaType.APPLICATION_JSON_VALUE) public ResponseEntity get(@PathVariable Long id) { diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java index 7a2349a..00c5478 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java @@ -7,7 +7,8 @@ import lombok.extern.slf4j.Slf4j; import java.util.Optional; -import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED; @Slf4j @@ -26,6 +27,7 @@ public class TransferService implements CreateTransferUseCase, HandleStateChange stored -> log.info("transfer already exisits: {}, ignoring: {}", stored, transfer), () -> { + log.info("creating transfer: {}", transfer); repository.store(transfer); messagingService.send(transfer.getId(), CREATED); });