1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHome;
5 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import jakarta.annotation.PreDestroy;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerConfig;
11 import org.apache.kafka.clients.consumer.KafkaConsumer;
12 import org.apache.kafka.clients.producer.KafkaProducer;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerConfig;
15 import org.apache.kafka.common.serialization.IntegerDeserializer;
16 import org.apache.kafka.common.serialization.IntegerSerializer;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 import org.apache.kafka.common.serialization.StringSerializer;
19 import org.springframework.beans.factory.annotation.Autowired;
20 import org.springframework.boot.ApplicationArguments;
21 import org.springframework.boot.ApplicationRunner;
22 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
23 import org.springframework.context.ConfigurableApplicationContext;
24 import org.springframework.context.annotation.Bean;
25 import org.springframework.context.annotation.Configuration;
26 import org.springframework.kafka.support.serializer.JsonDeserializer;
27 import org.springframework.kafka.support.serializer.JsonSerializer;
28 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
30 import java.time.Clock;
31 import java.time.ZoneId;
32 import java.util.Properties;
33 import java.util.concurrent.CompletableFuture;
36 @ConditionalOnProperty(
37 prefix = "chat.backend",
39 havingValue = "kafka")
42 public class KafkaServicesConfiguration implements ApplicationRunner
45 ThreadPoolTaskExecutor taskExecutor;
47 ConfigurableApplicationContext context;
50 ChatMessageChannel chatMessageChannel;
52 ChatRoomChannel chatRoomChannel;
54 CompletableFuture<Void> chatRoomChannelConsumerJob;
55 CompletableFuture<Void> chatMessageChannelConsumerJob;
59 public void run(ApplicationArguments args) throws Exception
61 log.info("Starting the consumer for the ChatRoomChannel");
62 chatRoomChannelConsumerJob = taskExecutor
63 .submitCompletable(chatRoomChannel)
66 log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
69 log.info("Starting the consumer for the ChatMessageChannel");
70 chatMessageChannelConsumerJob = taskExecutor
71 .submitCompletable(chatMessageChannel)
74 log.error("The consumer for the ChatMessageChannel exited abnormally!", e);
80 public void joinChatRoomChannelConsumerJob()
82 log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
83 chatRoomChannelConsumerJob.join();
84 log.info("Joined the consumer of the ChatRoomChannel");
88 public void joinChatMessageChannelConsumerJob()
90 log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
91 chatMessageChannelConsumerJob.join();
92 log.info("Joined the consumer of the ChatMessageChannel");
97 ChatHome kafkaChatHome(
98 ShardingStrategy shardingStrategy,
99 ChatMessageChannel chatMessageChannel)
101 return new KafkaChatHome(shardingStrategy, chatMessageChannel);
105 KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
107 return new KafkaChatRoomFactory(chatRoomChannel);
111 ChatRoomChannel chatRoomChannel(
112 ChatBackendProperties properties,
113 Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
114 Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
115 ShardingStrategy shardingStrategy,
116 ChatMessageChannel chatMessageChannel,
119 return new ChatRoomChannel(
120 properties.getKafka().getTopic(),
121 chatRoomChannelProducer,
122 chatRoomChannelConsumer,
126 properties.getChatroomBufferSize());
130 Producer<Integer, ChatRoomTo> chatRoomChannelProducer(
131 Properties defaultProducerProperties,
132 IntegerSerializer integerSerializer,
133 JsonSerializer<ChatRoomTo> chatRoomSerializer)
135 return new KafkaProducer<>(
136 defaultProducerProperties,
142 IntegerSerializer integerSerializer()
144 return new IntegerSerializer();
148 JsonSerializer<ChatRoomTo> chatRoomSerializer()
150 JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
155 Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer(
156 Properties defaultConsumerProperties,
157 IntegerDeserializer integerDeserializer,
158 JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
160 Properties properties = new Properties(defaultConsumerProperties);
161 properties.setProperty(
162 ConsumerConfig.GROUP_ID_CONFIG,
163 "chat_room_channel");
164 return new KafkaConsumer<>(
167 chatRoomDeserializer);
171 IntegerDeserializer integerDeserializer()
173 return new IntegerDeserializer();
177 JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
179 JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
184 ShardingStrategy shardingStrategy(ChatBackendProperties properties)
186 return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
190 ChatMessageChannel chatMessageChannel(
191 ChatBackendProperties properties,
192 Producer<String, MessageTo> chatMessageChannelProducer,
193 Consumer<String, MessageTo> chatMessageChannelConsumer,
196 return new ChatMessageChannel(
197 properties.getKafka().getTopic(),
198 chatMessageChannelProducer,
199 chatMessageChannelConsumer,
201 properties.getKafka().getNumPartitions());
205 Producer<String, MessageTo> chatMessageChannelProducer(
206 Properties defaultProducerProperties,
207 StringSerializer stringSerializer,
208 JsonSerializer<MessageTo> messageSerializer)
210 return new KafkaProducer<>(
211 defaultProducerProperties,
217 StringSerializer stringSerializer()
219 return new StringSerializer();
223 JsonSerializer<MessageTo> chatMessageSerializer()
225 JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
230 Consumer<String, MessageTo> chatMessageChannelConsumer(
231 Properties defaultConsumerProperties,
232 StringDeserializer stringDeserializer,
233 JsonDeserializer<MessageTo> messageDeserializer)
235 Properties properties = new Properties(defaultConsumerProperties);
236 properties.setProperty(
237 ConsumerConfig.GROUP_ID_CONFIG,
238 "chat_message_channel");
239 return new KafkaConsumer<>(
242 messageDeserializer);
246 StringDeserializer stringDeserializer()
248 return new StringDeserializer();
252 JsonDeserializer<MessageTo> chatMessageDeserializer()
254 JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
259 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
261 Properties properties = new Properties();
262 properties.setProperty(
263 ProducerConfig.CLIENT_ID_CONFIG,
264 chatBackendProperties.getKafka().getClientId());
265 properties.setProperty(
266 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
267 chatBackendProperties.getKafka().getBootstrapServers());
272 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
274 Properties properties = new Properties();
275 properties.setProperty(
276 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
277 chatBackendProperties.getKafka().getBootstrapServers());
278 properties.setProperty(
279 ConsumerConfig.CLIENT_ID_CONFIG,
280 chatBackendProperties.getKafka().getClientId());
281 properties.setProperty(
282 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
284 properties.setProperty(
285 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
293 return ZoneId.systemDefault();