Automatically rebuild the state after a crash / restart
authorKai Moritz <kai@juplo.de>
Tue, 15 Jun 2021 18:27:20 +0000 (20:27 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 19 Jun 2021 16:19:52 +0000 (18:19 +0200)
* Turning of enable.auto.commit and setting auto.offset.reset to earliest
  does the trick in this setup.
* Since no offset-commits are made, all events are re-read on startup.
* The TransferConsumer uses a special instance of TransferService, that
  is initialized with a NoOpMessageService, to replay the restored
  events without generating new events.

src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java
src/main/java/de/juplo/kafka/payment/transfer/adapter/NoOpMessageService.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java
src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java

index 58a3af2..259b62d 100644 (file)
@@ -3,9 +3,11 @@ package de.juplo.kafka.payment.transfer;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService;
+import de.juplo.kafka.payment.transfer.adapter.NoOpMessageService;
 import de.juplo.kafka.payment.transfer.adapter.TransferConsumer;
+import de.juplo.kafka.payment.transfer.adapter.TransferController;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.domain.TransferService;
-import de.juplo.kafka.payment.transfer.ports.MessagingService;
 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -19,6 +21,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -46,6 +49,8 @@ public class TransferServiceApplication
     Properties props = new Properties();
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
     props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.groupId);
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 
@@ -64,23 +69,57 @@ public class TransferServiceApplication
       KafkaConsumer<String, String> consumer,
       ExecutorService executorService,
       ObjectMapper mapper,
-      TransferService transferService)
+      TransferService productionTransferService,
+      TransferService restoreTransferService)
   {
-    TransferConsumer transferConsumer =
+    return
         new TransferConsumer(
             properties.topic,
             consumer,
             executorService,
             mapper,
-            transferService,
-            transferService,
-            transferService);
-    transferConsumer.start();
-    return transferConsumer;
+            new TransferConsumer.ConsumerUseCases() {
+              @Override
+              public void create(Transfer transfer)
+              {
+                productionTransferService.create(transfer);
+              }
+
+              @Override
+              public Optional<Transfer> get(Long id)
+              {
+                return productionTransferService.get(id);
+              }
+
+              @Override
+              public void handle(Transfer transfer)
+              {
+                productionTransferService.handle(transfer);
+              }
+            },
+            new TransferConsumer.ConsumerUseCases() {
+              @Override
+              public void create(Transfer transfer)
+              {
+                restoreTransferService.create(transfer);
+              }
+
+              @Override
+              public Optional<Transfer> get(Long id)
+              {
+                return restoreTransferService.get(id);
+              }
+
+              @Override
+              public void handle(Transfer transfer)
+              {
+                restoreTransferService.handle(transfer);
+              }
+            });
   }
 
   @Bean
-  MessagingService kafkaMessagingService(
+  KafkaMessagingService kafkaMessagingService(
       KafkaProducer<String, String> producer,
       ObjectMapper mapper,
       TransferServiceProperties properties)
@@ -89,11 +128,27 @@ public class TransferServiceApplication
   }
 
   @Bean
-  TransferService transferService(
+  TransferService productionTransferService(
       TransferRepository repository,
-      MessagingService messagingService)
+      KafkaMessagingService kafkaMessagingService)
+  {
+    return new TransferService(repository, kafkaMessagingService);
+  }
+
+  @Bean
+  TransferService restoreTransferService(
+      TransferRepository repository,
+      NoOpMessageService noOpMessageService)
+  {
+    return new TransferService(repository, noOpMessageService);
+  }
+
+  @Bean
+  TransferController transferController(
+      TransferService productionTransferService,
+      KafkaMessagingService kafkaMessagingService)
   {
-    return new TransferService(repository, messagingService);
+    return new TransferController(productionTransferService, kafkaMessagingService);
   }
 
 
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/NoOpMessageService.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/NoOpMessageService.java
new file mode 100644 (file)
index 0000000..143abf1
--- /dev/null
@@ -0,0 +1,28 @@
+package de.juplo.kafka.payment.transfer.adapter;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import de.juplo.kafka.payment.transfer.ports.MessagingService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.CompletableFuture;
+
+
+@Component
+@Slf4j
+public class NoOpMessageService implements MessagingService
+{
+  @Override
+  public CompletableFuture<?> send(Transfer transfer)
+  {
+    log.info("restoring transfer: {}", transfer);
+    return CompletableFuture.completedFuture(transfer.toString());
+  }
+
+  @Override
+  public CompletableFuture<?> send(Long id, Transfer.State state)
+  {
+    log.info("restoring state-change for transfer {}: {}", id, state);
+    return CompletableFuture.completedFuture("transfer: " + id + " - " + state);
+  }
+}
index 24f3e88..251588d 100644 (file)
@@ -8,18 +8,24 @@ import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.context.event.EventListener;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 
 import java.time.Duration;
-import java.util.Set;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
 
@@ -34,11 +40,11 @@ public class TransferConsumer implements Runnable
   private final KafkaConsumer<String, String> consumer;
   private final ExecutorService executorService;
   private final ObjectMapper mapper;
-  private final GetTransferUseCase getTransferUseCase;
-  private final CreateTransferUseCase createTransferUseCase;
-  private final HandleStateChangeUseCase handleStateChangeUseCase;
+  private final ConsumerUseCases productionUseCases, restoreUseCases;
 
+  private boolean restoring = true;
   private boolean running = false;
+  private boolean shutdown = false;
   private Future<?> future = null;
 
 
@@ -50,60 +56,147 @@ public class TransferConsumer implements Runnable
       try
       {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
-        if (records.count() > 0)
-          log.debug("polled {} records", records.count());
+        if (records.count() == 0)
+          continue;
 
-        records.forEach(record ->
-        {
-          try
-          {
-            byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
-
-            switch (eventType)
-            {
-              case EventType.NEW_TRANSFER:
-
-                NewTransferEvent newTransferEvent =
-                    mapper.readValue(record.value(), NewTransferEvent.class);
-                createTransferUseCase.create(newTransferEvent.toTransfer().setState(CREATED));
-                break;
-
-              case EventType.TRANSFER_STATE_CHANGED:
-
-                TransferStateChangedEvent stateChangedEvent =
-                    mapper.readValue(record.value(), TransferStateChangedEvent.class);
-                getTransferUseCase
-                    .get(stateChangedEvent.getId())
-                    .ifPresentOrElse(
-                        transfer -> handleStateChangeUseCase.handle(transfer.setState(stateChangedEvent.getState())),
-                        () -> log.error("unknown transfer: {}", stateChangedEvent.getId()));
-                break;
-            }
-          }
-          catch (JsonProcessingException e)
-          {
-            log.error(
-                "ignoring invalid json in message #{} on {}/{}: {}",
-                record.offset(),
-                record.topic(),
-                record.partition(),
-                record.value());
-          }
-        });
+        log.debug("polled {} records", records.count());
+        records.forEach(record -> handleRecord(record, productionUseCases));
       }
       catch (WakeupException e)
       {
-        log.info("polling aborted!");
+        log.info("cleanly interrupted while polling");
       }
     }
 
     log.info("polling stopped");
   }
 
+  private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
+  {
+    try
+    {
+      byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
+
+      switch (eventType)
+      {
+        case EventType.NEW_TRANSFER:
+
+          NewTransferEvent newTransferEvent =
+              mapper.readValue(record.value(), NewTransferEvent.class);
+          useCases.create(newTransferEvent.toTransfer().setState(CREATED));
+          break;
+
+        case EventType.TRANSFER_STATE_CHANGED:
+
+          TransferStateChangedEvent stateChangedEvent =
+              mapper.readValue(record.value(), TransferStateChangedEvent.class);
+          useCases
+              .get(stateChangedEvent.getId())
+              .ifPresentOrElse(
+                  transfer -> useCases.handle(transfer.setState(stateChangedEvent.getState())),
+                  () -> log.error("unknown transfer: {}", stateChangedEvent.getId()));
+          break;
+      }
+    }
+    catch (JsonProcessingException e)
+    {
+      log.error(
+          "ignoring invalid json in message #{} on {}/{}: {}",
+          record.offset(),
+          record.topic(),
+          record.partition(),
+          record.value());
+    }
+  }
+
+  @EventListener
+  public synchronized void onApplicationEvent(ContextRefreshedEvent event)
+  {
+    // Needed, because this method is called synchronously during the
+    // initialization pahse of Spring. If the restoring is processed
+    // in the same thread, it would block the completion of the initialization.
+    // Hence, the app would not react to any signal (CTRL-C, for example) except
+    // a KILL until the restoring is finished.
+    future = executorService.submit(() -> restore());
+  }
+
+  private void restore()
+  {
+    log.info("--> starting restore...");
+
+    List<TopicPartition> partitions =
+        consumer
+            .partitionsFor(topic)
+            .stream()
+            .map(info -> new TopicPartition(topic, info.partition()))
+            .collect(Collectors.toList());
+
+    Map<Integer, Long> lastSeen =
+        consumer
+            .endOffsets(partitions)
+            .entrySet()
+            .stream()
+            .collect(Collectors.toMap(
+                entry -> entry.getKey().partition(),
+                entry -> entry.getValue() - 1));
+
+    Map<Integer, Long> positions =
+        lastSeen
+            .keySet()
+            .stream()
+            .collect(Collectors.toMap(
+                partition -> partition,
+                partition -> 0l));
+
+    log.info("assigning {}}", partitions);
+    consumer.assign(partitions);
+
+    while (
+        restoring &&
+        positions
+            .entrySet()
+            .stream()
+            .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
+            .reduce(false, (a, b) -> a || b))
+    {
+      try
+      {
+        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
+        if (records.count() == 0)
+          continue;
+
+        log.debug("polled {} records", records.count());
+        records.forEach(record ->
+        {
+          handleRecord(record, restoreUseCases);
+          positions.put(record.partition(), record.offset());
+        });
+      }
+      catch(WakeupException e)
+      {
+        log.info("--> cleanly interrupted while restoring");
+        return;
+      }
+    }
+
+    log.info("--> restore completed!");
+    restoring = false;
+
+    // We are intentionally _not_ unsubscribing here, since that would
+    // reset the offset to _earliest_, because we disabled offset-commits.
+
+    start();
+  }
 
   @PostMapping("start")
   public synchronized String start()
   {
+    if (restoring)
+    {
+      log.error("cannot start while restoring");
+      return "Denied: Restoring!";
+    }
+
     String result = "Started";
 
     if (running)
@@ -112,27 +205,28 @@ public class TransferConsumer implements Runnable
       result = "Restarted";
     }
 
-    log.info("subscribing to topic {}", topic);
-    consumer.subscribe(Set.of(topic));
     running = true;
     future = executorService.submit(this);
 
+    log.info("started");
     return result;
   }
 
   @PostMapping("stop")
   public synchronized String stop()
   {
-    if (!running)
+    if (!(running || restoring))
     {
       log.info("not running!");
       return "Not running";
     }
 
     running = false;
+
     if (!future.isDone())
       consumer.wakeup();
-    log.info("waiting for the polling-loop to finish...");
+
+    log.info("waiting for the consumer...");
     try
     {
       future.get();
@@ -145,18 +239,25 @@ public class TransferConsumer implements Runnable
     finally
     {
       future = null;
-      log.info("unsubscribing");
-      consumer.unsubscribe();
     }
 
-    return "Stoped";
+    log.info("stopped");
+    return "Stopped";
   }
 
   public synchronized void shutdown()
   {
     log.info("shutdown initiated!");
+    shutdown = true;
     stop();
     log.info("closing consumer");
     consumer.close();
   }
+
+
+  public interface ConsumerUseCases
+      extends
+        GetTransferUseCase,
+        CreateTransferUseCase,
+        HandleStateChangeUseCase {};
 }
index b9a2fb0..5f30df6 100644 (file)
@@ -24,7 +24,8 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 
-@RestController
+@RequestMapping(TransferController.PATH)
+@ResponseBody
 @RequiredArgsConstructor
 @Slf4j
  public class TransferController
@@ -36,7 +37,7 @@ import java.util.concurrent.CompletableFuture;
 
 
   @PostMapping(
-      path = PATH,
+      path = "",
       consumes = MediaType.APPLICATION_JSON_VALUE,
       produces = MediaType.APPLICATION_JSON_VALUE)
   public DeferredResult<ResponseEntity<?>> transfer(
@@ -85,7 +86,7 @@ import java.util.concurrent.CompletableFuture;
   }
 
   @GetMapping(
-      path = PATH + "/{id}",
+      path = "/{id}",
       produces = MediaType.APPLICATION_JSON_VALUE)
   public ResponseEntity<TransferDTO> get(@PathVariable Long id)
   {
index 7a2349a..00c5478 100644 (file)
@@ -7,7 +7,8 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.util.Optional;
 
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
 
 
 @Slf4j
@@ -26,6 +27,7 @@ public class TransferService implements CreateTransferUseCase, HandleStateChange
             stored -> log.info("transfer already exisits: {}, ignoring: {}", stored, transfer),
             () ->
             {
+              log.info("creating transfer: {}", transfer);
               repository.store(transfer);
               messagingService.send(transfer.getId(), CREATED);
             });