Simplified the thread-execution
authorKai Moritz <kai@juplo.de>
Sun, 20 Jun 2021 13:29:27 +0000 (15:29 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 20 Jun 2021 13:29:27 +0000 (15:29 +0200)
src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java

index 7e90c57..eab6abf 100644 (file)
@@ -23,8 +23,6 @@ import org.springframework.context.annotation.Bean;
 
 import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 
 @SpringBootApplication
@@ -57,17 +55,10 @@ public class TransferServiceApplication
     return new KafkaConsumer<>(props);
   }
 
-  @Bean(destroyMethod = "shutdown")
-  ExecutorService executorService()
-  {
-    return Executors.newFixedThreadPool(1);
-  }
-
   @Bean(destroyMethod = "shutdown")
   TransferConsumer transferConsumer(
       TransferServiceProperties properties,
       KafkaConsumer<String, String> consumer,
-      ExecutorService executorService,
       ObjectMapper mapper,
       TransferService productionTransferService,
       TransferService restoreTransferService)
@@ -76,7 +67,6 @@ public class TransferServiceApplication
         new TransferConsumer(
             properties.topic,
             consumer,
-            executorService,
             mapper,
             new TransferConsumer.ConsumerUseCases() {
               @Override
index 18f5383..1fd2689 100644 (file)
@@ -21,8 +21,8 @@ import org.springframework.web.bind.annotation.ResponseBody;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
@@ -35,7 +35,6 @@ public class TransferConsumer implements Runnable
 {
   private final String topic;
   private final KafkaConsumer<String, String> consumer;
-  private final ExecutorService executorService;
   private final ObjectMapper mapper;
   private final ConsumerUseCases productionUseCases, restoreUseCases;
 
@@ -125,7 +124,7 @@ public class TransferConsumer implements Runnable
     // in the same thread, it would block the completion of the initialization.
     // Hence, the app would not react to any signal (CTRL-C, for example) except
     // a KILL until the restoring is finished.
-    future = executorService.submit(() -> restore());
+    future = CompletableFuture.runAsync(() -> restore());
   }
 
   private void restore()
@@ -214,7 +213,7 @@ public class TransferConsumer implements Runnable
     }
 
     running = true;
-    future = executorService.submit(this);
+    future = CompletableFuture.runAsync(this);
 
     log.info("started");
     return result;