From fe12b154e9817bbefe01f3c6b8173f08525f6474 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 4 Mar 2024 15:33:11 +0100 Subject: [PATCH] WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN --- .../kafka/ChannelTaskExecutor.java | 2 +- .../kafka/ChannelTaskRunner.java | 18 ++++++++++++------ .../implementation/kafka/DataChannel.java | 1 + .../implementation/kafka/InfoChannel.java | 1 + .../kafka/KafkaServicesConfiguration.java | 3 +-- 5 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java index e61ecfe7..d274cf24 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java @@ -38,7 +38,7 @@ public class ChannelTaskExecutor } @PreDestroy - public void joinConsumerTaskJob() + public void join() { log.info("Signaling the consumer-task for {} to quit its work", channel); consumer.wakeup(); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java index 015f7d51..9e8576d0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java @@ -19,14 +19,20 @@ public class ChannelTaskRunner public void joinChannel() throws InterruptedException { - dataChannelTaskExecutor.joinConsumerTaskJob(); - while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN) + joinChannel(dataChannelTaskExecutor); + joinChannel(infoChannelTaskExecutor); + } + + private void joinChannel( + ChannelTaskExecutor channelTaskExecutor) + throws InterruptedException + { + Channel channel = channelTaskExecutor.getChannel(); + while (channel.getChannelState() != ChannelState.SHUTTING_DOWN) { - log.info("Waiting for {} to shut down...", infoChannel); + log.info("Waiting for {} to shut down...", channel); Thread.sleep(1000); } - infoChannelTaskExecutor.joinConsumerTaskJob(); + channelTaskExecutor.join(); } - - private void joinChannel() } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 0b572f48..2468af5a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -230,6 +230,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener catch (WakeupException e) { log.info("Received WakeupException, exiting!"); + channelState = ChannelState.SHUTTING_DOWN; running = false; } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index ec4a5ebd..7665faea 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -210,6 +210,7 @@ public class InfoChannel implements Channel catch (WakeupException e) { log.info("Received WakeupException, exiting!"); + channelState = ChannelState.SHUTTING_DOWN; running = false; } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 1088cabf..857f8ecd 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -46,8 +46,7 @@ public class KafkaServicesConfiguration { return new ChannelTaskRunner( infoChannelTaskExecutor, - dataChannelTaskExecutor, - infoChannel); + dataChannelTaskExecutor); } @Bean -- 2.20.1