From e7150b6822c45c520db73a96785dc0a8a81f503b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 20 Apr 2023 09:11:16 +0200 Subject: [PATCH] NEU --- .../persistence/kafka/ChatMessageChannel.java | 11 +---- .../persistence/kafka/ChatRoomChannel.java | 12 +---- .../kafka/KafkaServicesConfiguration.java | 48 ++++++++++++------- 3 files changed, 34 insertions(+), 37 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index 7b19bb6b..43ea3994 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -18,12 +18,11 @@ import reactor.core.publisher.Mono; import java.time.*; import java.util.*; -import java.util.concurrent.Callable; import java.util.stream.IntStream; @Slf4j -public class ChatMessageChannel implements Callable>, ConsumerRebalanceListener +public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { private final String topic; private final Producer producer; @@ -153,7 +152,7 @@ public class ChatMessageChannel implements Callable>, Consum } @Override - public Optional call() + public void run() { consumer.subscribe(List.of(topic)); @@ -191,15 +190,9 @@ public class ChatMessageChannel implements Callable>, Consum log.info("Received WakeupException, exiting!"); running = false; } - catch (Exception e) - { - log.error("Exiting abnormally!"); - return Optional.of(e); - } } log.info("Exiting normally"); - return Optional.empty(); } void loadMessages(ConsumerRecords records) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index 9ea23b13..2a34c812 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -14,14 +14,12 @@ import reactor.core.publisher.Mono; import java.time.*; import java.util.List; -import java.util.Optional; import java.util.UUID; -import java.util.concurrent.Callable; @RequiredArgsConstructor @Slf4j -public class ChatRoomChannel implements Callable> +public class ChatRoomChannel implements Runnable { private final String topic; private final Producer producer; @@ -70,7 +68,7 @@ public class ChatRoomChannel implements Callable> } @Override - public Optional call() + public void run() { consumer.assign(List.of(new TopicPartition(topic, 0))); @@ -93,15 +91,9 @@ public class ChatRoomChannel implements Callable> log.info("Received WakeupException, exiting!"); running = false; } - catch (Exception e) - { - log.error("Exiting abnormally!"); - return Optional.of(e); - } } log.info("Exiting normally"); - return Optional.empty(); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index 3148c6f0..5367fdf0 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -9,6 +9,7 @@ import de.juplo.kafka.chat.backend.domain.SimpleChatHome; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; +import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; @@ -43,37 +44,48 @@ public class KafkaServicesConfiguration implements ApplicationRunner @Autowired ChatMessageChannel chatMessageChannel; + @Autowired + ChatRoomChannel chatRoomChannel; - CompletableFuture> chatRoomChannelConsumerJob; - CompletableFuture> chatMessageChannelConsumerJob; + CompletableFuture chatRoomChannelConsumerJob; + CompletableFuture chatMessageChannelConsumerJob; @Override public void run(ApplicationArguments args) throws Exception { log.info("Starting the consumer for the ChatRoomChannel"); - chatRoomChannelConsumerJob = taskExecutor.submitCompletable(chatMessageChannel); - chatRoomChannelConsumerJob.thenAccept(exceptionOptional -> - { - exceptionOptional.ifPresent(); - log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus); - SpringApplication.exit(context, () -> exitStatus); - }, - t -> + chatRoomChannelConsumerJob = taskExecutor + .submitCompletable(chatRoomChannel) + .exceptionally(e -> + { + log.error("The consumer for the ChatRoomChannel exited abnormally!", e); + return null; + }); + log.info("Starting the consumer for the ChatMessageChannel"); + chatMessageChannelConsumerJob = taskExecutor + .submitCompletable(chatMessageChannel) + .exceptionally(e -> { - log.error("SimpleConsumer exited abnormally!", t); - SpringApplication.exit(context, () -> 2); + log.error("The consumer for the ChatMessageChannel exited abnormally!", e); + return null; }); } @PreDestroy - public void shutdown() throws ExecutionException, InterruptedException + public void joinChatRoomChannelConsumerJob() + { + log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); + chatRoomChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatRoomChannel"); + } + + @PreDestroy + public void joinChatMessageChannelConsumerJob() { - log.info("Signaling SimpleConsumer to quit its work"); - kafkaConsumer.wakeup(); - log.info("Waiting for SimpleConsumer to finish its work"); - consumerJob.get(); - log.info("SimpleConsumer finished its work"); + log.info("Waiting for the consumer of the ChatMessageChannel to finish its work"); + chatMessageChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatMessageChannel"); } -- 2.20.1