From 571bd14522b51222cb55d5bc6516cce09eb7c04b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 4 Mar 2024 15:09:15 +0100 Subject: [PATCH] WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN --- .../implementation/kafka/ChannelExecutor.java | 16 +++++++++------- .../implementation/kafka/ChannelRunner.java | 19 ++++++++++--------- .../implementation/kafka/DataChannel.java | 4 +++- .../implementation/kafka/InfoChannel.java | 4 +++- .../kafka/KafkaServicesApplicationRunner.java | 4 ++-- .../implementation/kafka/KafkaTestUtils.java | 4 ++-- 6 files changed, 29 insertions(+), 22 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java index fb554483..087d94d3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import jakarta.annotation.PreDestroy; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -15,7 +16,8 @@ import java.util.concurrent.CompletableFuture; public class ChannelExecutor { private final ThreadPoolTaskExecutor taskExecutor; - private final Runnable consumerTask; + @Getter + private final Channel channel; private final Consumer consumer; private final WorkAssignor workAssignor; @@ -25,12 +27,12 @@ public class ChannelExecutor public void executeConsumerTask() { workAssignor.assignWork(consumer); - log.info("Starting the consumer-task for {}", consumerTask); + log.info("Starting the consumer-task for {}", channel); consumerTaskJob = taskExecutor - .submitCompletable(consumerTask) + .submitCompletable(channel) .exceptionally(e -> { - log.error("The consumer-task for {} exited abnormally!", consumerTask, e); + log.error("The consumer-task for {} exited abnormally!", channel, e); return null; }); } @@ -38,10 +40,10 @@ public class ChannelExecutor @PreDestroy public void joinConsumerTaskJob() { - log.info("Signaling the consumer-task for {} to quit its work", consumerTask); + log.info("Signaling the consumer-task for {} to quit its work", channel); consumer.wakeup(); - log.info("Waiting for the consumer of {} to finish its work", consumerTask); + log.info("Waiting for the consumer of {} to finish its work", channel); consumerTaskJob.join(); - log.info("Joined the consumer-task for {}", consumerTask); + log.info("Joined the consumer-task for {}", channel); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java index 95204486..92db79bf 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java @@ -8,24 +8,25 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class ChannelRunner { - private final ChannelExecutor infoChannelChannelExecutor; - private final ChannelExecutor dataChannelChannelExecutor; - private final InfoChannel infoChannel; + private final ChannelExecutor infoChannelExecutor; + private final ChannelExecutor dataChannelExecutor; - public void executeConsumerTasks() + public void executeChannel() { - infoChannelChannelExecutor.executeConsumerTask(); - dataChannelChannelExecutor.executeConsumerTask(); + infoChannelExecutor.executeConsumerTask(); + dataChannelExecutor.executeConsumerTask(); } - public void joinConsumerTasks() throws InterruptedException + public void joinChannel() throws InterruptedException { - dataChannelChannelExecutor.joinConsumerTaskJob(); + dataChannelExecutor.joinConsumerTaskJob(); while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN) { log.info("Waiting for {} to shut down...", infoChannel); Thread.sleep(1000); } - infoChannelChannelExecutor.joinConsumerTaskJob(); + infoChannelExecutor.joinConsumerTaskJob(); } + + private void joinChannel() } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 5a5a6838..0b572f48 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -8,6 +8,7 @@ import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import lombok.Getter; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.Producer; @@ -24,8 +25,9 @@ import java.util.UUID; import java.util.stream.IntStream; +@ToString(of = { "topic", "instanceId" }) @Slf4j -public class DataChannel implements Runnable, ConsumerRebalanceListener +public class DataChannel implements Channel, ConsumerRebalanceListener { private final String instanceId; private final String topic; diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 13556dab..ec4a5ebd 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -6,6 +6,7 @@ import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatR import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked; import lombok.Getter; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -24,8 +25,9 @@ import java.util.UUID; import java.util.stream.IntStream; +@ToString(of = { "topic", "instanceUri" }) @Slf4j -public class InfoChannel implements Runnable +public class InfoChannel implements Channel { private final String topic; private final Producer producer; diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index ba427079..f44c9b52 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -22,12 +22,12 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @Override public void run(ApplicationArguments args) { - channelRunner.executeConsumerTasks(); + channelRunner.executeChannel(); } @PreDestroy public void joinConsumerTasks() throws InterruptedException { - channelRunner.joinConsumerTasks(); + channelRunner.joinChannel(); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java index eb1a19d5..ebd1b680 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java @@ -52,7 +52,7 @@ public abstract class KafkaTestUtils send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); - channelRunner.executeConsumerTasks(); + channelRunner.executeChannel(); } private static void send( @@ -74,6 +74,6 @@ public abstract class KafkaTestUtils public static void joinConsumerTasks(ChannelRunner channelRunner) throws InterruptedException { - channelRunner.joinConsumerTasks(); + channelRunner.joinChannel(); } } -- 2.20.1