b11babc7ffda058ce7b84c9303ed3837ab532967
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHome;
5 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import jakarta.annotation.PreDestroy;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.common.serialization.IntegerDeserializer;
14 import org.apache.kafka.common.serialization.IntegerSerializer;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.beans.factory.annotation.Autowired;
18 import org.springframework.boot.ApplicationArguments;
19 import org.springframework.boot.ApplicationRunner;
20 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
21 import org.springframework.context.ConfigurableApplicationContext;
22 import org.springframework.context.annotation.Bean;
23 import org.springframework.context.annotation.Configuration;
24 import org.springframework.kafka.support.serializer.JsonDeserializer;
25 import org.springframework.kafka.support.serializer.JsonSerializer;
26 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
27
28 import java.time.Clock;
29 import java.time.ZoneId;
30 import java.util.Properties;
31 import java.util.concurrent.CompletableFuture;
32
33
34 @ConditionalOnProperty(
35     prefix = "chat.backend",
36     name = "services",
37     havingValue = "kafka")
38 @Configuration
39 @Slf4j
40 public class KafkaServicesConfiguration implements ApplicationRunner
41 {
42   @Autowired
43   ThreadPoolTaskExecutor taskExecutor;
44   @Autowired
45   ConfigurableApplicationContext context;
46
47   @Autowired
48   ChatMessageChannel chatMessageChannel;
49   @Autowired
50   ChatRoomChannel chatRoomChannel;
51
52   CompletableFuture<Void> chatRoomChannelConsumerJob;
53   CompletableFuture<Void> chatMessageChannelConsumerJob;
54
55
56   @Override
57   public void run(ApplicationArguments args) throws Exception
58   {
59     log.info("Starting the consumer for the ChatRoomChannel");
60     chatRoomChannelConsumerJob = taskExecutor
61         .submitCompletable(chatRoomChannel)
62         .exceptionally(e ->
63         {
64           log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
65           return null;
66         });
67     log.info("Starting the consumer for the ChatMessageChannel");
68     chatMessageChannelConsumerJob = taskExecutor
69         .submitCompletable(chatMessageChannel)
70         .exceptionally(e ->
71         {
72           log.error("The consumer for the ChatMessageChannel exited abnormally!", e);
73           return null;
74         });
75   }
76
77   @PreDestroy
78   public void joinChatRoomChannelConsumerJob()
79   {
80     log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
81     chatRoomChannelConsumerJob.join();
82     log.info("Joined the consumer of the ChatRoomChannel");
83   }
84
85   @PreDestroy
86   public void joinChatMessageChannelConsumerJob()
87   {
88     log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
89     chatMessageChannelConsumerJob.join();
90     log.info("Joined the consumer of the ChatMessageChannel");
91   }
92
93
94   @Bean
95   ChatHome kafkaChatHome(
96       ShardingStrategy shardingStrategy,
97       ChatMessageChannel chatMessageChannel)
98   {
99     return new KafkaChatHome(shardingStrategy, chatMessageChannel);
100   }
101
102   @Bean
103   KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
104   {
105     return new KafkaChatRoomFactory(chatRoomChannel);
106   }
107
108   @Bean
109   ChatRoomChannel chatRoomChannel(
110       ChatBackendProperties properties,
111       Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
112       Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
113       ShardingStrategy shardingStrategy,
114       ChatMessageChannel chatMessageChannel,
115       Clock clock)
116   {
117     return new ChatRoomChannel(
118         properties.getKafka().getTopic(),
119         chatRoomChannelProducer,
120         chatRoomChannelConsumer,
121         shardingStrategy,
122         chatMessageChannel,
123         clock,
124         properties.getChatroomBufferSize());
125   }
126
127   @Bean
128   Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
129       Properties producerProperties,
130       IntegerSerializer integerSerializer,
131       JsonSerializer<ChatRoomTo> chatRoomSerializer)
132   {
133     return new KafkaProducer<>(
134         producerProperties,
135         integerSerializer,
136         chatRoomSerializer);
137   }
138
139   @Bean
140   IntegerSerializer integerSerializer()
141   {
142     return new IntegerSerializer();
143   }
144
145   @Bean
146   JsonSerializer<ChatRoomTo> chatRoomSerializer()
147   {
148     JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
149     return serializer;
150   }
151
152   @Bean
153   Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
154       Properties producerProperties,
155       IntegerDeserializer integerDeserializer,
156       JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
157   {
158     return new KafkaConsumer<>(
159         producerProperties,
160         integerDeserializer,
161         chatRoomDeserializer);
162   }
163
164   @Bean
165   IntegerDeserializer integerDeserializer()
166   {
167     return new IntegerDeserializer();
168   }
169
170   @Bean
171   JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
172   {
173     JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
174     return deserializer;
175   }
176
177   @Bean
178   ShardingStrategy shardingStrategy(ChatBackendProperties properties)
179   {
180     return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
181   }
182
183   @Bean
184   ChatMessageChannel chatMessageChannel(
185       ChatBackendProperties properties,
186       Producer<String, MessageTo> chatMessageChannelProducer,
187       Consumer<String, MessageTo> chatMessageChannelConsumer,
188       ZoneId zoneId)
189   {
190     return new ChatMessageChannel(
191         properties.getKafka().getTopic(),
192         chatMessageChannelProducer,
193         chatMessageChannelConsumer,
194         zoneId,
195         properties.getKafka().getNumPartitions());
196   }
197
198   @Bean
199   Producer<String, MessageTo>  chatMessageChannelProducer(
200       Properties producerProperties,
201       StringSerializer stringSerializer,
202       JsonSerializer<MessageTo> messageSerializer)
203   {
204     return new KafkaProducer<>(
205         producerProperties,
206         stringSerializer,
207         messageSerializer);
208   }
209
210   @Bean
211   StringSerializer stringSerializer()
212   {
213     return new StringSerializer();
214   }
215
216   @Bean
217   JsonSerializer<MessageTo> chatMessageSerializer()
218   {
219     JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
220     return serializer;
221   }
222
223   @Bean
224   Consumer<String, MessageTo>  chatMessageChannelConsumer(
225       Properties producerProperties,
226       StringDeserializer stringDeserializer,
227       JsonDeserializer<MessageTo> messageDeserializer)
228   {
229     return new KafkaConsumer<>(
230         producerProperties,
231         stringDeserializer,
232         messageDeserializer);
233   }
234
235   @Bean
236   StringDeserializer stringDeserializer()
237   {
238     return new StringDeserializer();
239   }
240
241   @Bean
242   JsonDeserializer<MessageTo> chatMessageDeserializer()
243   {
244     JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
245     return deserializer;
246   }
247
248   @Bean
249   Properties producerProperties(ChatBackendProperties chatBackendProperties)
250   {
251     Properties properties = new Properties();
252     return properties;
253   }
254
255   @Bean
256   Properties consumerProperties(ChatBackendProperties chatBackendProperties)
257   {
258     Properties properties = new Properties();
259     return properties;
260   }
261
262   @Bean
263   ZoneId zoneId()
264   {
265     return ZoneId.systemDefault();
266   }
267 }