From 50b9f17e84f6d7a155515c60596d9aa354e41bfe Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 13 Sep 2023 21:20:39 +0200 Subject: [PATCH] WIP --- .../kafka/KafkaServicesApplicationRunner.java | 29 +++++++++---------- .../kafka/KafkaServicesConfiguration.java | 21 ++++++++++++++ .../kafka/KafkaChatHomeServiceTest.java | 2 -- 3 files changed, 34 insertions(+), 18 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index 70e41e1d..a863353e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -3,13 +3,13 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import jakarta.annotation.PreDestroy; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.ConfigurableApplicationContext; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; @@ -21,22 +21,14 @@ import java.util.concurrent.CompletableFuture; prefix = "chat.backend", name = "services", havingValue = "kafka") -@Component +@RequiredArgsConstructor @Slf4j public class KafkaServicesApplicationRunner implements ApplicationRunner { - @Autowired - ChatBackendProperties properties; - - @Autowired - ThreadPoolTaskExecutor taskExecutor; - @Autowired - ConfigurableApplicationContext context; - - @Autowired - ChatRoomChannel chatRoomChannel; - @Autowired - Consumer chatRoomChannelConsumer; + private final ThreadPoolTaskExecutor taskExecutor; + private final ChatRoomChannel chatRoomChannel; + private final Consumer chatRoomChannelConsumer; + private final WorkAssignor workAssignor; CompletableFuture chatRoomChannelConsumerJob; @@ -44,8 +36,7 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @Override public void run(ApplicationArguments args) throws Exception { - List topics = List.of(properties.getKafka().getChatRoomChannelTopic()); - chatRoomChannelConsumer.subscribe(topics, chatRoomChannel); + workAssignor.assignWork(chatRoomChannelConsumer); log.info("Starting the consumer for the ChatRoomChannel"); chatRoomChannelConsumerJob = taskExecutor .submitCompletable(chatRoomChannel) @@ -65,4 +56,10 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner chatRoomChannelConsumerJob.join(); log.info("Joined the consumer of the ChatRoomChannel"); } + + + interface WorkAssignor + { + void assignWork(Consumer consumer); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index d63111a7..373fcb43 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -16,12 +16,15 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.TaskExecutor; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.time.Clock; import java.time.ZoneId; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -33,6 +36,24 @@ import java.util.Properties; @Configuration public class KafkaServicesConfiguration { + @Bean + KafkaServicesApplicationRunner kafkaServicesApplicationRunner( + ChatBackendProperties properties, + ThreadPoolTaskExecutor taskExecutor, + ChatRoomChannel chatRoomChannel, + Consumer chatRoomChannelConsumer) + { + return new KafkaServicesApplicationRunner( + taskExecutor, + chatRoomChannel, + chatRoomChannelConsumer, + consumer -> + { + List topics = List.of(properties.getKafka().getChatRoomChannelTopic()); + consumer.subscribe(topics, chatRoomChannel); + }); + } + @Bean ChatHomeService kafkaChatHome( ChatBackendProperties properties, diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index c120e465..20a09369 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -49,8 +49,6 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest { final static String TOPIC = "KAFKA_CHAT_HOME_TEST"; - static CompletableFuture CONSUMER_JOB; - @TestConfiguration @EnableConfigurationProperties(ChatBackendProperties.class) -- 2.20.1