import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import jakarta.annotation.PreDestroy;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
prefix = "chat.backend",
name = "services",
havingValue = "kafka")
-@Component
+@RequiredArgsConstructor
@Slf4j
public class KafkaServicesApplicationRunner implements ApplicationRunner
{
- @Autowired
- ChatBackendProperties properties;
-
- @Autowired
- ThreadPoolTaskExecutor taskExecutor;
- @Autowired
- ConfigurableApplicationContext context;
-
- @Autowired
- ChatRoomChannel chatRoomChannel;
- @Autowired
- Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
+ private final ThreadPoolTaskExecutor taskExecutor;
+ private final ChatRoomChannel chatRoomChannel;
+ private final Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
+ private final WorkAssignor workAssignor;
CompletableFuture<Void> chatRoomChannelConsumerJob;
@Override
public void run(ApplicationArguments args) throws Exception
{
- List<String> topics = List.of(properties.getKafka().getChatRoomChannelTopic());
- chatRoomChannelConsumer.subscribe(topics, chatRoomChannel);
+ workAssignor.assignWork(chatRoomChannelConsumer);
log.info("Starting the consumer for the ChatRoomChannel");
chatRoomChannelConsumerJob = taskExecutor
.submitCompletable(chatRoomChannel)
chatRoomChannelConsumerJob.join();
log.info("Joined the consumer of the ChatRoomChannel");
}
+
+
+ interface WorkAssignor
+ {
+ void assignWork(Consumer<?, ?> consumer);
+ }
}
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.core.task.TaskExecutor;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.time.Clock;
import java.time.ZoneId;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
@Configuration
public class KafkaServicesConfiguration
{
+ @Bean
+ KafkaServicesApplicationRunner kafkaServicesApplicationRunner(
+ ChatBackendProperties properties,
+ ThreadPoolTaskExecutor taskExecutor,
+ ChatRoomChannel chatRoomChannel,
+ Consumer<String, AbstractMessageTo> chatRoomChannelConsumer)
+ {
+ return new KafkaServicesApplicationRunner(
+ taskExecutor,
+ chatRoomChannel,
+ chatRoomChannelConsumer,
+ consumer ->
+ {
+ List<String> topics = List.of(properties.getKafka().getChatRoomChannelTopic());
+ consumer.subscribe(topics, chatRoomChannel);
+ });
+ }
+
@Bean
ChatHomeService kafkaChatHome(
ChatBackendProperties properties,