5367fdf0841c873feda731b860add63fcf2b4a16
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
5 import de.juplo.kafka.chat.backend.domain.ChatHome;
6 import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
7 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
8 import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
9 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
10 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
11 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
12 import jakarta.annotation.PreDestroy;
13 import lombok.extern.slf4j.Slf4j;
14 import org.apache.kafka.clients.consumer.Consumer;
15 import org.apache.kafka.clients.producer.Producer;
16 import org.springframework.beans.factory.annotation.Autowired;
17 import org.springframework.boot.ApplicationArguments;
18 import org.springframework.boot.ApplicationRunner;
19 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
20 import org.springframework.context.ConfigurableApplicationContext;
21 import org.springframework.context.annotation.Bean;
22 import org.springframework.context.annotation.Configuration;
23 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
24 import org.springframework.util.concurrent.ListenableFuture;
25
26 import java.time.Clock;
27 import java.time.ZoneId;
28 import java.util.Optional;
29 import java.util.concurrent.CompletableFuture;
30
31
32 @ConditionalOnProperty(
33     prefix = "chat.backend",
34     name = "services",
35     havingValue = "kafka")
36 @Configuration
37 @Slf4j
38 public class KafkaServicesConfiguration implements ApplicationRunner
39 {
40   @Autowired
41   ThreadPoolTaskExecutor taskExecutor;
42   @Autowired
43   ConfigurableApplicationContext context;
44
45   @Autowired
46   ChatMessageChannel chatMessageChannel;
47   @Autowired
48   ChatRoomChannel chatRoomChannel;
49
50   CompletableFuture<Void> chatRoomChannelConsumerJob;
51   CompletableFuture<Void> chatMessageChannelConsumerJob;
52
53
54   @Override
55   public void run(ApplicationArguments args) throws Exception
56   {
57     log.info("Starting the consumer for the ChatRoomChannel");
58     chatRoomChannelConsumerJob = taskExecutor
59         .submitCompletable(chatRoomChannel)
60         .exceptionally(e ->
61         {
62           log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
63           return null;
64         });
65     log.info("Starting the consumer for the ChatMessageChannel");
66     chatMessageChannelConsumerJob = taskExecutor
67         .submitCompletable(chatMessageChannel)
68         .exceptionally(e ->
69         {
70           log.error("The consumer for the ChatMessageChannel exited abnormally!", e);
71           return null;
72         });
73   }
74
75   @PreDestroy
76   public void joinChatRoomChannelConsumerJob()
77   {
78     log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
79     chatRoomChannelConsumerJob.join();
80     log.info("Joined the consumer of the ChatRoomChannel");
81   }
82
83   @PreDestroy
84   public void joinChatMessageChannelConsumerJob()
85   {
86     log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
87     chatMessageChannelConsumerJob.join();
88     log.info("Joined the consumer of the ChatMessageChannel");
89   }
90
91
92   @Bean
93   ChatHome kafkaChatHome(
94       ShardingStrategy shardingStrategy,
95       ChatMessageChannel chatMessageChannel)
96   {
97     return new KafkaChatHome(shardingStrategy, chatMessageChannel);
98   }
99
100   @Bean
101   KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
102   {
103     return new KafkaChatRoomFactory(chatRoomChannel);
104   }
105
106   @Bean
107   ChatRoomChannel chatRoomChannel(
108       ChatBackendProperties properties,
109       Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
110       Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
111       ShardingStrategy shardingStrategy,
112       ChatMessageChannel chatMessageChannel,
113       Clock clock)
114   {
115     return new ChatRoomChannel(
116         properties.getKafka().getTopic(),
117         chatRoomChannelProducer,
118         chatRoomChannelConsumer,
119         shardingStrategy,
120         chatMessageChannel,
121         clock,
122         properties.getChatroomBufferSize());
123   }
124
125   @Bean
126   ChatMessageChannel chatMessageChannel(
127       ChatBackendProperties properties,
128       Producer<String, MessageTo> chatMessageChannelProducer,
129       Consumer<String, MessageTo> chatMessageChannelConsumer,
130       ZoneId zoneId)
131   {
132     return new ChatMessageChannel(
133         properties.getKafka().getTopic(),
134         chatMessageChannelProducer,
135         chatMessageChannelConsumer,
136         zoneId,
137         properties.getKafka().getNumPartitions());
138   }
139
140   @Bean
141   ShardingStrategy shardingStrategy(ChatBackendProperties properties)
142   {
143     return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
144   }
145
146   @Bean
147   ZoneId zoneId()
148   {
149     return ZoneId.systemDefault();
150   }
151 }