Simplified the thread-execution
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
index 63fbef5..1fd2689 100644 (file)
@@ -21,8 +21,8 @@ import org.springframework.web.bind.annotation.ResponseBody;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
@@ -35,7 +35,6 @@ public class TransferConsumer implements Runnable
 {
   private final String topic;
   private final KafkaConsumer<String, String> consumer;
-  private final ExecutorService executorService;
   private final ObjectMapper mapper;
   private final ConsumerUseCases productionUseCases, restoreUseCases;
 
@@ -105,6 +104,16 @@ public class TransferConsumer implements Runnable
           record.partition(),
           record.value());
     }
+    catch (IllegalArgumentException e)
+    {
+      log.error(
+          "ignoring invalid message #{} on {}/{}: {}, message={}",
+          record.offset(),
+          record.topic(),
+          record.partition(),
+          e.getMessage(),
+          record.value());
+    }
   }
 
   @EventListener
@@ -115,7 +124,7 @@ public class TransferConsumer implements Runnable
     // in the same thread, it would block the completion of the initialization.
     // Hence, the app would not react to any signal (CTRL-C, for example) except
     // a KILL until the restoring is finished.
-    future = executorService.submit(() -> restore());
+    future = CompletableFuture.runAsync(() -> restore());
   }
 
   private void restore()
@@ -204,7 +213,7 @@ public class TransferConsumer implements Runnable
     }
 
     running = true;
-    future = executorService.submit(this);
+    future = CompletableFuture.runAsync(this);
 
     log.info("started");
     return result;