1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
5 import de.juplo.kafka.chat.backend.domain.ChatHome;
6 import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
7 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
8 import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
9 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
10 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
11 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
12 import jakarta.annotation.PreDestroy;
13 import lombok.extern.slf4j.Slf4j;
14 import org.apache.kafka.clients.consumer.Consumer;
15 import org.apache.kafka.clients.producer.Producer;
16 import org.springframework.beans.factory.annotation.Autowired;
17 import org.springframework.boot.ApplicationArguments;
18 import org.springframework.boot.ApplicationRunner;
19 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
20 import org.springframework.context.ConfigurableApplicationContext;
21 import org.springframework.context.annotation.Bean;
22 import org.springframework.context.annotation.Configuration;
23 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
24 import org.springframework.util.concurrent.ListenableFuture;
26 import java.time.Clock;
27 import java.time.ZoneId;
28 import java.util.Optional;
29 import java.util.concurrent.CompletableFuture;
32 @ConditionalOnProperty(
33 prefix = "chat.backend",
35 havingValue = "kafka")
38 public class KafkaServicesConfiguration implements ApplicationRunner
41 ThreadPoolTaskExecutor taskExecutor;
43 ConfigurableApplicationContext context;
46 ChatMessageChannel chatMessageChannel;
48 ChatRoomChannel chatRoomChannel;
50 CompletableFuture<Void> chatRoomChannelConsumerJob;
51 CompletableFuture<Void> chatMessageChannelConsumerJob;
55 public void run(ApplicationArguments args) throws Exception
57 log.info("Starting the consumer for the ChatRoomChannel");
58 chatRoomChannelConsumerJob = taskExecutor
59 .submitCompletable(chatRoomChannel)
62 log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
65 log.info("Starting the consumer for the ChatMessageChannel");
66 chatMessageChannelConsumerJob = taskExecutor
67 .submitCompletable(chatMessageChannel)
70 log.error("The consumer for the ChatMessageChannel exited abnormally!", e);
76 public void joinChatRoomChannelConsumerJob()
78 log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
79 chatRoomChannelConsumerJob.join();
80 log.info("Joined the consumer of the ChatRoomChannel");
84 public void joinChatMessageChannelConsumerJob()
86 log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
87 chatMessageChannelConsumerJob.join();
88 log.info("Joined the consumer of the ChatMessageChannel");
93 ChatHome kafkaChatHome(
94 ShardingStrategy shardingStrategy,
95 ChatMessageChannel chatMessageChannel)
97 return new KafkaChatHome(shardingStrategy, chatMessageChannel);
101 KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
103 return new KafkaChatRoomFactory(chatRoomChannel);
107 ChatRoomChannel chatRoomChannel(
108 ChatBackendProperties properties,
109 Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
110 Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
111 ShardingStrategy shardingStrategy,
112 ChatMessageChannel chatMessageChannel,
115 return new ChatRoomChannel(
116 properties.getKafka().getTopic(),
117 chatRoomChannelProducer,
118 chatRoomChannelConsumer,
122 properties.getChatroomBufferSize());
126 ChatMessageChannel chatMessageChannel(
127 ChatBackendProperties properties,
128 Producer<String, MessageTo> chatMessageChannelProducer,
129 Consumer<String, MessageTo> chatMessageChannelConsumer,
132 return new ChatMessageChannel(
133 properties.getKafka().getTopic(),
134 chatMessageChannelProducer,
135 chatMessageChannelConsumer,
137 properties.getKafka().getNumPartitions());
141 ShardingStrategy shardingStrategy(ChatBackendProperties properties)
143 return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
149 return ZoneId.systemDefault();