X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaServicesConfiguration.java;h=5367fdf0841c873feda731b860add63fcf2b4a16;hb=e7150b6822c45c520db73a96785dc0a8a81f503b;hp=3148c6f00e735dfa0af5d1a77f3a033a76256b5b;hpb=5c338c58065988f7841c4ab9ee1b193e754da9b9;p=demos%2Fkafka%2Fchat diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index 3148c6f0..5367fdf0 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -9,6 +9,7 @@ import de.juplo.kafka.chat.backend.domain.SimpleChatHome; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; +import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; @@ -43,37 +44,48 @@ public class KafkaServicesConfiguration implements ApplicationRunner @Autowired ChatMessageChannel chatMessageChannel; + @Autowired + ChatRoomChannel chatRoomChannel; - CompletableFuture> chatRoomChannelConsumerJob; - CompletableFuture> chatMessageChannelConsumerJob; + CompletableFuture chatRoomChannelConsumerJob; + CompletableFuture chatMessageChannelConsumerJob; @Override public void run(ApplicationArguments args) throws Exception { log.info("Starting the consumer for the ChatRoomChannel"); - chatRoomChannelConsumerJob = taskExecutor.submitCompletable(chatMessageChannel); - chatRoomChannelConsumerJob.thenAccept(exceptionOptional -> - { - exceptionOptional.ifPresent(); - log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus); - SpringApplication.exit(context, () -> exitStatus); - }, - t -> + chatRoomChannelConsumerJob = taskExecutor + .submitCompletable(chatRoomChannel) + .exceptionally(e -> + { + log.error("The consumer for the ChatRoomChannel exited abnormally!", e); + return null; + }); + log.info("Starting the consumer for the ChatMessageChannel"); + chatMessageChannelConsumerJob = taskExecutor + .submitCompletable(chatMessageChannel) + .exceptionally(e -> { - log.error("SimpleConsumer exited abnormally!", t); - SpringApplication.exit(context, () -> 2); + log.error("The consumer for the ChatMessageChannel exited abnormally!", e); + return null; }); } @PreDestroy - public void shutdown() throws ExecutionException, InterruptedException + public void joinChatRoomChannelConsumerJob() + { + log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); + chatRoomChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatRoomChannel"); + } + + @PreDestroy + public void joinChatMessageChannelConsumerJob() { - log.info("Signaling SimpleConsumer to quit its work"); - kafkaConsumer.wakeup(); - log.info("Waiting for SimpleConsumer to finish its work"); - consumerJob.get(); - log.info("SimpleConsumer finished its work"); + log.info("Waiting for the consumer of the ChatMessageChannel to finish its work"); + chatMessageChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatMessageChannel"); }