From 42448259d41e4658753590f6f1b6a0cd7dc9e8f9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 4 Mar 2024 15:05:23 +0100 Subject: [PATCH] WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN --- .../implementation/kafka/ChannelExecutor.java | 2 +- .../implementation/kafka/ChannelRunner.java | 14 ++++++------- .../kafka/KafkaServicesApplicationRunner.java | 6 +++--- .../kafka/KafkaServicesConfiguration.java | 21 +++++++++---------- .../chat/backend/KafkaConfigurationIT.java | 8 +++---- .../kafka/KafkaChatHomeServiceTest.java | 8 +++---- .../implementation/kafka/KafkaTestUtils.java | 8 +++---- 7 files changed, 33 insertions(+), 34 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java index 9425bdf4..fb554483 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java @@ -12,7 +12,7 @@ import java.util.concurrent.CompletableFuture; @RequiredArgsConstructor @Slf4j -public class ConsumerTaskExecutor +public class ChannelExecutor { private final ThreadPoolTaskExecutor taskExecutor; private final Runnable consumerTask; diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java index 0c6de1d5..95204486 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java @@ -6,26 +6,26 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor @Slf4j -public class ConsumerTaskRunner +public class ChannelRunner { - private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor; - private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor; + private final ChannelExecutor infoChannelChannelExecutor; + private final ChannelExecutor dataChannelChannelExecutor; private final InfoChannel infoChannel; public void executeConsumerTasks() { - infoChannelConsumerTaskExecutor.executeConsumerTask(); - dataChannelConsumerTaskExecutor.executeConsumerTask(); + infoChannelChannelExecutor.executeConsumerTask(); + dataChannelChannelExecutor.executeConsumerTask(); } public void joinConsumerTasks() throws InterruptedException { - dataChannelConsumerTaskExecutor.joinConsumerTaskJob(); + dataChannelChannelExecutor.joinConsumerTaskJob(); while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN) { log.info("Waiting for {} to shut down...", infoChannel); Thread.sleep(1000); } - infoChannelConsumerTaskExecutor.joinConsumerTaskJob(); + infoChannelChannelExecutor.joinConsumerTaskJob(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index 69e94c9e..ba427079 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -16,18 +16,18 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class KafkaServicesApplicationRunner implements ApplicationRunner { - private final ConsumerTaskRunner consumerTaskRunner; + private final ChannelRunner channelRunner; @Override public void run(ApplicationArguments args) { - consumerTaskRunner.executeConsumerTasks(); + channelRunner.executeConsumerTasks(); } @PreDestroy public void joinConsumerTasks() throws InterruptedException { - consumerTaskRunner.joinConsumerTasks(); + channelRunner.joinConsumerTasks(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 33371279..33f8a6ec 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -1,7 +1,6 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; @@ -40,25 +39,25 @@ import java.util.Properties; public class KafkaServicesConfiguration { @Bean - ConsumerTaskRunner consumerTaskRunner( - ConsumerTaskExecutor infoChannelConsumerTaskExecutor, - ConsumerTaskExecutor dataChannelConsumerTaskExecutor, + ChannelRunner consumerTaskRunner( + ChannelExecutor infoChannelChannelExecutor, + ChannelExecutor dataChannelChannelExecutor, InfoChannel infoChannel) { - return new ConsumerTaskRunner( - infoChannelConsumerTaskExecutor, - dataChannelConsumerTaskExecutor, + return new ChannelRunner( + infoChannelChannelExecutor, + dataChannelChannelExecutor, infoChannel); } @Bean - ConsumerTaskExecutor infoChannelConsumerTaskExecutor( + ChannelExecutor infoChannelConsumerTaskExecutor( ThreadPoolTaskExecutor taskExecutor, InfoChannel infoChannel, Consumer infoChannelConsumer, WorkAssignor infoChannelWorkAssignor) { - return new ConsumerTaskExecutor( + return new ChannelExecutor( taskExecutor, infoChannel, infoChannelConsumer, @@ -82,13 +81,13 @@ public class KafkaServicesConfiguration } @Bean - ConsumerTaskExecutor dataChannelConsumerTaskExecutor( + ChannelExecutor dataChannelConsumerTaskExecutor( ThreadPoolTaskExecutor taskExecutor, DataChannel dataChannel, Consumer dataChannelConsumer, WorkAssignor dataChannelWorkAssignor) { - return new ConsumerTaskExecutor( + return new ChannelExecutor( taskExecutor, dataChannel, dataChannelConsumer, diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index e01e012d..88a9b058 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -43,21 +43,21 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT @BeforeAll public static void sendAndLoadStoredData( @Autowired KafkaTemplate messageTemplate, - @Autowired ConsumerTaskRunner consumerTaskRunner) + @Autowired ChannelRunner channelRunner) { KafkaTestUtils.sendAndLoadStoredData( messageTemplate, INFO_TOPIC, DATA_TOPIC, - consumerTaskRunner); + channelRunner); } @AfterAll static void joinConsumerTasks( - @Autowired ConsumerTaskRunner consumerTaskRunner) + @Autowired ChannelRunner channelRunner) throws InterruptedException { - KafkaTestUtils.joinConsumerTasks(consumerTaskRunner); + KafkaTestUtils.joinConsumerTasks(channelRunner); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index 180ff152..6b75c466 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -44,20 +44,20 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest @BeforeAll static void sendAndLoadStoredData( @Autowired KafkaTemplate messageTemplate, - @Autowired ConsumerTaskRunner consumerTaskRunner) + @Autowired ChannelRunner channelRunner) { KafkaTestUtils.sendAndLoadStoredData( messageTemplate, INFO_TOPIC, DATA_TOPIC, - consumerTaskRunner); + channelRunner); } @AfterAll static void joinConsumerTasks( - @Autowired ConsumerTaskRunner consumerTaskRunner) + @Autowired ChannelRunner channelRunner) throws InterruptedException { - KafkaTestUtils.joinConsumerTasks(consumerTaskRunner); + KafkaTestUtils.joinConsumerTasks(channelRunner); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java index 2ede2029..eb1a19d5 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java @@ -44,7 +44,7 @@ public abstract class KafkaTestUtils KafkaTemplate messageTemplate, String infoTopic, String dataTopic, - ConsumerTaskRunner consumerTaskRunner) + ChannelRunner channelRunner) { send(messageTemplate, infoTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created"); send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); @@ -52,7 +52,7 @@ public abstract class KafkaTestUtils send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); - consumerTaskRunner.executeConsumerTasks(); + channelRunner.executeConsumerTasks(); } private static void send( @@ -72,8 +72,8 @@ public abstract class KafkaTestUtils new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition())); } - public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner) throws InterruptedException + public static void joinConsumerTasks(ChannelRunner channelRunner) throws InterruptedException { - consumerTaskRunner.joinConsumerTasks(); + channelRunner.joinConsumerTasks(); } } -- 2.20.1