From: Kai Moritz Date: Mon, 4 Mar 2024 14:10:31 +0000 (+0100) Subject: WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN X-Git-Tag: rebase--2024-03-05--09-07~5 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a7a3d53e445556b56d4039fe1d5a00c294443e90;p=demos%2Fkafka%2Fchat WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java index 087d94d3..e61ecfe7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java @@ -13,7 +13,7 @@ import java.util.concurrent.CompletableFuture; @RequiredArgsConstructor @Slf4j -public class ChannelExecutor +public class ChannelTaskExecutor { private final ThreadPoolTaskExecutor taskExecutor; @Getter diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java index 92db79bf..015f7d51 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java @@ -6,26 +6,26 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor @Slf4j -public class ChannelRunner +public class ChannelTaskRunner { - private final ChannelExecutor infoChannelExecutor; - private final ChannelExecutor dataChannelExecutor; + private final ChannelTaskExecutor infoChannelTaskExecutor; + private final ChannelTaskExecutor dataChannelTaskExecutor; public void executeChannel() { - infoChannelExecutor.executeConsumerTask(); - dataChannelExecutor.executeConsumerTask(); + infoChannelTaskExecutor.executeConsumerTask(); + dataChannelTaskExecutor.executeConsumerTask(); } public void joinChannel() throws InterruptedException { - dataChannelExecutor.joinConsumerTaskJob(); + dataChannelTaskExecutor.joinConsumerTaskJob(); while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN) { log.info("Waiting for {} to shut down...", infoChannel); Thread.sleep(1000); } - infoChannelExecutor.joinConsumerTaskJob(); + infoChannelTaskExecutor.joinConsumerTaskJob(); } private void joinChannel() diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index f44c9b52..2cb3971b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -16,18 +16,18 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class KafkaServicesApplicationRunner implements ApplicationRunner { - private final ChannelRunner channelRunner; + private final ChannelTaskRunner channelTaskRunner; @Override public void run(ApplicationArguments args) { - channelRunner.executeChannel(); + channelTaskRunner.executeChannel(); } @PreDestroy public void joinConsumerTasks() throws InterruptedException { - channelRunner.joinChannel(); + channelTaskRunner.joinChannel(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 33f8a6ec..81655894 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -39,25 +39,25 @@ import java.util.Properties; public class KafkaServicesConfiguration { @Bean - ChannelRunner consumerTaskRunner( - ChannelExecutor infoChannelChannelExecutor, - ChannelExecutor dataChannelChannelExecutor, + ChannelTaskRunner consumerTaskRunner( + ChannelTaskExecutor infoChannelChannelTaskExecutor, + ChannelTaskExecutor dataChannelChannelTaskExecutor, InfoChannel infoChannel) { - return new ChannelRunner( - infoChannelChannelExecutor, - dataChannelChannelExecutor, + return new ChannelTaskRunner( + infoChannelChannelTaskExecutor, + dataChannelChannelTaskExecutor, infoChannel); } @Bean - ChannelExecutor infoChannelConsumerTaskExecutor( + ChannelTaskExecutor infoChannelConsumerTaskExecutor( ThreadPoolTaskExecutor taskExecutor, InfoChannel infoChannel, Consumer infoChannelConsumer, WorkAssignor infoChannelWorkAssignor) { - return new ChannelExecutor( + return new ChannelTaskExecutor( taskExecutor, infoChannel, infoChannelConsumer, @@ -81,13 +81,13 @@ public class KafkaServicesConfiguration } @Bean - ChannelExecutor dataChannelConsumerTaskExecutor( + ChannelTaskExecutor dataChannelConsumerTaskExecutor( ThreadPoolTaskExecutor taskExecutor, DataChannel dataChannel, Consumer dataChannelConsumer, WorkAssignor dataChannelWorkAssignor) { - return new ChannelExecutor( + return new ChannelTaskExecutor( taskExecutor, dataChannel, dataChannelConsumer, diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index 88a9b058..e54fac4e 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -43,21 +43,21 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT @BeforeAll public static void sendAndLoadStoredData( @Autowired KafkaTemplate messageTemplate, - @Autowired ChannelRunner channelRunner) + @Autowired ChannelTaskRunner channelTaskRunner) { KafkaTestUtils.sendAndLoadStoredData( messageTemplate, INFO_TOPIC, DATA_TOPIC, - channelRunner); + channelTaskRunner); } @AfterAll static void joinConsumerTasks( - @Autowired ChannelRunner channelRunner) + @Autowired ChannelTaskRunner channelTaskRunner) throws InterruptedException { - KafkaTestUtils.joinConsumerTasks(channelRunner); + KafkaTestUtils.joinConsumerTasks(channelTaskRunner); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index 6b75c466..2ec28265 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -44,20 +44,20 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest @BeforeAll static void sendAndLoadStoredData( @Autowired KafkaTemplate messageTemplate, - @Autowired ChannelRunner channelRunner) + @Autowired ChannelTaskRunner channelTaskRunner) { KafkaTestUtils.sendAndLoadStoredData( messageTemplate, INFO_TOPIC, DATA_TOPIC, - channelRunner); + channelTaskRunner); } @AfterAll static void joinConsumerTasks( - @Autowired ChannelRunner channelRunner) + @Autowired ChannelTaskRunner channelTaskRunner) throws InterruptedException { - KafkaTestUtils.joinConsumerTasks(channelRunner); + KafkaTestUtils.joinConsumerTasks(channelTaskRunner); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java index ebd1b680..ca30f667 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java @@ -44,7 +44,7 @@ public abstract class KafkaTestUtils KafkaTemplate messageTemplate, String infoTopic, String dataTopic, - ChannelRunner channelRunner) + ChannelTaskRunner channelTaskRunner) { send(messageTemplate, infoTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created"); send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); @@ -52,7 +52,7 @@ public abstract class KafkaTestUtils send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); - channelRunner.executeChannel(); + channelTaskRunner.executeChannel(); } private static void send( @@ -72,8 +72,8 @@ public abstract class KafkaTestUtils new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition())); } - public static void joinConsumerTasks(ChannelRunner channelRunner) throws InterruptedException + public static void joinConsumerTasks(ChannelTaskRunner channelTaskRunner) throws InterruptedException { - channelRunner.joinChannel(); + channelTaskRunner.joinChannel(); } }