From: Kai Moritz Date: Sun, 20 Jun 2021 13:29:27 +0000 (+0200) Subject: Simplified the thread-execution X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=commitdiff_plain;h=90762b3c47fc63734707e52acfaeb8e427089f41 Simplified the thread-execution --- diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index 7e90c57..eab6abf 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -23,8 +23,6 @@ import org.springframework.context.annotation.Bean; import java.util.Optional; import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; @SpringBootApplication @@ -57,17 +55,10 @@ public class TransferServiceApplication return new KafkaConsumer<>(props); } - @Bean(destroyMethod = "shutdown") - ExecutorService executorService() - { - return Executors.newFixedThreadPool(1); - } - @Bean(destroyMethod = "shutdown") TransferConsumer transferConsumer( TransferServiceProperties properties, KafkaConsumer consumer, - ExecutorService executorService, ObjectMapper mapper, TransferService productionTransferService, TransferService restoreTransferService) @@ -76,7 +67,6 @@ public class TransferServiceApplication new TransferConsumer( properties.topic, consumer, - executorService, mapper, new TransferConsumer.ConsumerUseCases() { @Override diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 18f5383..1fd2689 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -21,8 +21,8 @@ import org.springframework.web.bind.annotation.ResponseBody; import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.stream.Collectors; @@ -35,7 +35,6 @@ public class TransferConsumer implements Runnable { private final String topic; private final KafkaConsumer consumer; - private final ExecutorService executorService; private final ObjectMapper mapper; private final ConsumerUseCases productionUseCases, restoreUseCases; @@ -125,7 +124,7 @@ public class TransferConsumer implements Runnable // in the same thread, it would block the completion of the initialization. // Hence, the app would not react to any signal (CTRL-C, for example) except // a KILL until the restoring is finished. - future = executorService.submit(() -> restore()); + future = CompletableFuture.runAsync(() -> restore()); } private void restore() @@ -214,7 +213,7 @@ public class TransferConsumer implements Runnable } running = true; - future = executorService.submit(this); + future = CompletableFuture.runAsync(this); log.info("started"); return result;