TransferRepository does not need any synchronization
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java
new file mode 100644 (file)
index 0000000..17d91de
--- /dev/null
@@ -0,0 +1,135 @@
+package de.juplo.kafka.payment.transfer.adapter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+
+@RequestMapping("/consumer")
+@ResponseBody
+@RequiredArgsConstructor
+@Slf4j
+public class TransferConsumer implements Runnable
+{
+  private final String topic;
+  private final KafkaConsumer<String, String> consumer;
+  private final ExecutorService executorService;
+  private final ObjectMapper mapper;
+  private final HandleTransferUseCase handleTransferUseCase;
+
+  private boolean running = false;
+  private Future<?> future = null;
+
+
+  @Override
+  public void run()
+  {
+    while (running)
+    {
+      try
+      {
+        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
+        log.debug("polled {} records", records.count());
+
+        records.forEach(record ->
+        {
+          try
+          {
+            Transfer transfer = mapper.readValue(record.value(), Transfer.class);
+            handleTransferUseCase.handle(transfer);
+          }
+          catch (JsonProcessingException e)
+          {
+            log.error(
+                "ignoring invalid json in message #{} on {}/{}: {}",
+                record.offset(),
+                record.topic(),
+                record.partition(),
+                record.value());
+          }
+        });
+      }
+      catch (WakeupException e)
+      {
+        log.info("polling aborted!");
+      }
+    }
+
+    log.info("polling stopped");
+  }
+
+
+  @PostMapping("start")
+  public synchronized String start()
+  {
+    String result = "Started";
+
+    if (running)
+    {
+      stop();
+      result = "Restarted";
+    }
+
+    log.info("subscribing to topic {}", topic);
+    consumer.subscribe(Set.of(topic));
+    running = true;
+    future = executorService.submit(this);
+
+    return result;
+  }
+
+  @PostMapping("stop")
+  public synchronized String stop()
+  {
+    if (!running)
+    {
+      log.info("not running!");
+      return "Not running";
+    }
+
+    running = false;
+    if (!future.isDone())
+      consumer.wakeup();
+    log.info("waiting for the polling-loop to finish...");
+    try
+    {
+      future.get();
+    }
+    catch (InterruptedException|ExecutionException e)
+    {
+      log.error("Exception while joining polling task!", e);
+      return e.getMessage();
+    }
+    finally
+    {
+      future = null;
+      log.info("unsubscribing");
+      consumer.unsubscribe();
+    }
+
+    return "Stoped";
+  }
+
+  public synchronized void shutdown()
+  {
+    log.info("shutdown initiated!");
+    stop();
+    log.info("closing consumer");
+    consumer.close();
+  }
+}