efc86b9e277314dd43e66381e7c07b95aa72c7f3
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHome;
5 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import jakarta.annotation.PreDestroy;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerConfig;
11 import org.apache.kafka.clients.consumer.KafkaConsumer;
12 import org.apache.kafka.clients.producer.KafkaProducer;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerConfig;
15 import org.apache.kafka.common.serialization.IntegerDeserializer;
16 import org.apache.kafka.common.serialization.IntegerSerializer;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 import org.apache.kafka.common.serialization.StringSerializer;
19 import org.springframework.beans.factory.annotation.Autowired;
20 import org.springframework.boot.ApplicationArguments;
21 import org.springframework.boot.ApplicationRunner;
22 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
23 import org.springframework.context.ConfigurableApplicationContext;
24 import org.springframework.context.annotation.Bean;
25 import org.springframework.context.annotation.Configuration;
26 import org.springframework.kafka.support.serializer.JsonDeserializer;
27 import org.springframework.kafka.support.serializer.JsonSerializer;
28 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
29
30 import java.time.Clock;
31 import java.time.ZoneId;
32 import java.util.Properties;
33 import java.util.concurrent.CompletableFuture;
34
35
36 @ConditionalOnProperty(
37     prefix = "chat.backend",
38     name = "services",
39     havingValue = "kafka")
40 @Configuration
41 @Slf4j
42 public class KafkaServicesConfiguration implements ApplicationRunner
43 {
44   @Autowired
45   ThreadPoolTaskExecutor taskExecutor;
46   @Autowired
47   ConfigurableApplicationContext context;
48
49   @Autowired
50   ChatMessageChannel chatMessageChannel;
51   @Autowired
52   ChatRoomChannel chatRoomChannel;
53
54   CompletableFuture<Void> chatRoomChannelConsumerJob;
55   CompletableFuture<Void> chatMessageChannelConsumerJob;
56
57
58   @Override
59   public void run(ApplicationArguments args) throws Exception
60   {
61     log.info("Starting the consumer for the ChatRoomChannel");
62     chatRoomChannelConsumerJob = taskExecutor
63         .submitCompletable(chatRoomChannel)
64         .exceptionally(e ->
65         {
66           log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
67           return null;
68         });
69     log.info("Starting the consumer for the ChatMessageChannel");
70     chatMessageChannelConsumerJob = taskExecutor
71         .submitCompletable(chatMessageChannel)
72         .exceptionally(e ->
73         {
74           log.error("The consumer for the ChatMessageChannel exited abnormally!", e);
75           return null;
76         });
77   }
78
79   @PreDestroy
80   public void joinChatRoomChannelConsumerJob()
81   {
82     log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
83     chatRoomChannelConsumerJob.join();
84     log.info("Joined the consumer of the ChatRoomChannel");
85   }
86
87   @PreDestroy
88   public void joinChatMessageChannelConsumerJob()
89   {
90     log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
91     chatMessageChannelConsumerJob.join();
92     log.info("Joined the consumer of the ChatMessageChannel");
93   }
94
95
96   @Bean
97   ChatHome kafkaChatHome(
98       ShardingStrategy shardingStrategy,
99       ChatMessageChannel chatMessageChannel)
100   {
101     return new KafkaChatHome(shardingStrategy, chatMessageChannel);
102   }
103
104   @Bean
105   KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
106   {
107     return new KafkaChatRoomFactory(chatRoomChannel);
108   }
109
110   @Bean
111   ChatRoomChannel chatRoomChannel(
112       ChatBackendProperties properties,
113       Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
114       Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
115       ShardingStrategy shardingStrategy,
116       ChatMessageChannel chatMessageChannel,
117       Clock clock)
118   {
119     return new ChatRoomChannel(
120         properties.getKafka().getTopic(),
121         chatRoomChannelProducer,
122         chatRoomChannelConsumer,
123         shardingStrategy,
124         chatMessageChannel,
125         clock,
126         properties.getChatroomBufferSize());
127   }
128
129   @Bean
130   Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
131       Properties defaultProducerProperties,
132       IntegerSerializer integerSerializer,
133       JsonSerializer<ChatRoomTo> chatRoomSerializer)
134   {
135     return new KafkaProducer<>(
136         defaultProducerProperties,
137         integerSerializer,
138         chatRoomSerializer);
139   }
140
141   @Bean
142   IntegerSerializer integerSerializer()
143   {
144     return new IntegerSerializer();
145   }
146
147   @Bean
148   JsonSerializer<ChatRoomTo> chatRoomSerializer()
149   {
150     JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
151     return serializer;
152   }
153
154   @Bean
155   Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
156       Properties defaultConsumerProperties,
157       IntegerDeserializer integerDeserializer,
158       JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
159   {
160     Properties properties = new Properties(defaultConsumerProperties);
161     properties.setProperty(
162         ConsumerConfig.GROUP_ID_CONFIG,
163         "chat_room_channel");
164     return new KafkaConsumer<>(
165         properties,
166         integerDeserializer,
167         chatRoomDeserializer);
168   }
169
170   @Bean
171   IntegerDeserializer integerDeserializer()
172   {
173     return new IntegerDeserializer();
174   }
175
176   @Bean
177   JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
178   {
179     JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
180     return deserializer;
181   }
182
183   @Bean
184   ShardingStrategy shardingStrategy(ChatBackendProperties properties)
185   {
186     return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
187   }
188
189   @Bean
190   ChatMessageChannel chatMessageChannel(
191       ChatBackendProperties properties,
192       Producer<String, MessageTo> chatMessageChannelProducer,
193       Consumer<String, MessageTo> chatMessageChannelConsumer,
194       ZoneId zoneId)
195   {
196     return new ChatMessageChannel(
197         properties.getKafka().getTopic(),
198         chatMessageChannelProducer,
199         chatMessageChannelConsumer,
200         zoneId,
201         properties.getKafka().getNumPartitions());
202   }
203
204   @Bean
205   Producer<String, MessageTo>  chatMessageChannelProducer(
206       Properties defaultProducerProperties,
207       StringSerializer stringSerializer,
208       JsonSerializer<MessageTo> messageSerializer)
209   {
210     return new KafkaProducer<>(
211         defaultProducerProperties,
212         stringSerializer,
213         messageSerializer);
214   }
215
216   @Bean
217   StringSerializer stringSerializer()
218   {
219     return new StringSerializer();
220   }
221
222   @Bean
223   JsonSerializer<MessageTo> chatMessageSerializer()
224   {
225     JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
226     return serializer;
227   }
228
229   @Bean
230   Consumer<String, MessageTo>  chatMessageChannelConsumer(
231       Properties defaultConsumerProperties,
232       StringDeserializer stringDeserializer,
233       JsonDeserializer<MessageTo> messageDeserializer)
234   {
235     Properties properties = new Properties(defaultConsumerProperties);
236     properties.setProperty(
237         ConsumerConfig.GROUP_ID_CONFIG,
238         "chat_message_channel");
239     return new KafkaConsumer<>(
240         properties,
241         stringDeserializer,
242         messageDeserializer);
243   }
244
245   @Bean
246   StringDeserializer stringDeserializer()
247   {
248     return new StringDeserializer();
249   }
250
251   @Bean
252   JsonDeserializer<MessageTo> chatMessageDeserializer()
253   {
254     JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
255     return deserializer;
256   }
257
258   @Bean
259   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
260   {
261     Properties properties = new Properties();
262     properties.setProperty(
263         ProducerConfig.CLIENT_ID_CONFIG,
264         chatBackendProperties.getKafka().getClientId());
265     properties.setProperty(
266         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
267         chatBackendProperties.getKafka().getBootstrapServers());
268     return properties;
269   }
270
271   @Bean
272   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
273   {
274     Properties properties = new Properties();
275     properties.setProperty(
276         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
277         chatBackendProperties.getKafka().getBootstrapServers());
278     properties.setProperty(
279         ConsumerConfig.CLIENT_ID_CONFIG,
280         chatBackendProperties.getKafka().getClientId());
281     properties.setProperty(
282         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
283         "false");
284     properties.setProperty(
285         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
286         "earliest");
287     return properties;
288   }
289
290   @Bean
291   ZoneId zoneId()
292   {
293     return ZoneId.systemDefault();
294   }
295 }