1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHome;
5 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import jakarta.annotation.PreDestroy;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.common.serialization.IntegerDeserializer;
14 import org.apache.kafka.common.serialization.IntegerSerializer;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.beans.factory.annotation.Autowired;
18 import org.springframework.boot.ApplicationArguments;
19 import org.springframework.boot.ApplicationRunner;
20 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
21 import org.springframework.context.ConfigurableApplicationContext;
22 import org.springframework.context.annotation.Bean;
23 import org.springframework.context.annotation.Configuration;
24 import org.springframework.kafka.support.serializer.JsonDeserializer;
25 import org.springframework.kafka.support.serializer.JsonSerializer;
26 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
28 import java.time.Clock;
29 import java.time.ZoneId;
30 import java.util.Properties;
31 import java.util.concurrent.CompletableFuture;
34 @ConditionalOnProperty(
35 prefix = "chat.backend",
37 havingValue = "kafka")
40 public class KafkaServicesConfiguration implements ApplicationRunner
43 ThreadPoolTaskExecutor taskExecutor;
45 ConfigurableApplicationContext context;
48 ChatMessageChannel chatMessageChannel;
50 ChatRoomChannel chatRoomChannel;
52 CompletableFuture<Void> chatRoomChannelConsumerJob;
53 CompletableFuture<Void> chatMessageChannelConsumerJob;
57 public void run(ApplicationArguments args) throws Exception
59 log.info("Starting the consumer for the ChatRoomChannel");
60 chatRoomChannelConsumerJob = taskExecutor
61 .submitCompletable(chatRoomChannel)
64 log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
67 log.info("Starting the consumer for the ChatMessageChannel");
68 chatMessageChannelConsumerJob = taskExecutor
69 .submitCompletable(chatMessageChannel)
72 log.error("The consumer for the ChatMessageChannel exited abnormally!", e);
78 public void joinChatRoomChannelConsumerJob()
80 log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
81 chatRoomChannelConsumerJob.join();
82 log.info("Joined the consumer of the ChatRoomChannel");
86 public void joinChatMessageChannelConsumerJob()
88 log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
89 chatMessageChannelConsumerJob.join();
90 log.info("Joined the consumer of the ChatMessageChannel");
95 ChatHome kafkaChatHome(
96 ShardingStrategy shardingStrategy,
97 ChatMessageChannel chatMessageChannel)
99 return new KafkaChatHome(shardingStrategy, chatMessageChannel);
103 KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
105 return new KafkaChatRoomFactory(chatRoomChannel);
109 ChatRoomChannel chatRoomChannel(
110 ChatBackendProperties properties,
111 Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
112 Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
113 ShardingStrategy shardingStrategy,
114 ChatMessageChannel chatMessageChannel,
117 return new ChatRoomChannel(
118 properties.getKafka().getTopic(),
119 chatRoomChannelProducer,
120 chatRoomChannelConsumer,
124 properties.getChatroomBufferSize());
128 Producer<Integer, ChatRoomTo> chatRoomChannelProducer(
129 Properties producerProperties,
130 IntegerSerializer integerSerializer,
131 JsonSerializer<ChatRoomTo> chatRoomSerializer)
133 return new KafkaProducer<>(
140 IntegerSerializer integerSerializer()
142 return new IntegerSerializer();
146 JsonSerializer<ChatRoomTo> chatRoomSerializer()
148 JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
153 Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer(
154 Properties producerProperties,
155 IntegerDeserializer integerDeserializer,
156 JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
158 return new KafkaConsumer<>(
161 chatRoomDeserializer);
165 IntegerDeserializer integerDeserializer()
167 return new IntegerDeserializer();
171 JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
173 JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
178 ShardingStrategy shardingStrategy(ChatBackendProperties properties)
180 return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
184 ChatMessageChannel chatMessageChannel(
185 ChatBackendProperties properties,
186 Producer<String, MessageTo> chatMessageChannelProducer,
187 Consumer<String, MessageTo> chatMessageChannelConsumer,
190 return new ChatMessageChannel(
191 properties.getKafka().getTopic(),
192 chatMessageChannelProducer,
193 chatMessageChannelConsumer,
195 properties.getKafka().getNumPartitions());
199 Producer<String, MessageTo> chatMessageChannelProducer(
200 Properties producerProperties,
201 StringSerializer stringSerializer,
202 JsonSerializer<MessageTo> messageSerializer)
204 return new KafkaProducer<>(
211 StringSerializer stringSerializer()
213 return new StringSerializer();
217 JsonSerializer<MessageTo> chatMessageSerializer()
219 JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
224 Consumer<String, MessageTo> chatMessageChannelConsumer(
225 Properties producerProperties,
226 StringDeserializer stringDeserializer,
227 JsonDeserializer<MessageTo> messageDeserializer)
229 return new KafkaConsumer<>(
232 messageDeserializer);
236 StringDeserializer stringDeserializer()
238 return new StringDeserializer();
242 JsonDeserializer<MessageTo> chatMessageDeserializer()
244 JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
249 Properties producerProperties(ChatBackendProperties chatBackendProperties)
251 Properties properties = new Properties();
256 Properties consumerProperties(ChatBackendProperties chatBackendProperties)
258 Properties properties = new Properties();
265 return ZoneId.systemDefault();