NEU
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
5 import de.juplo.kafka.chat.backend.domain.ChatHome;
6 import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
7 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
8 import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
9 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
10 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
11 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
12 import lombok.extern.slf4j.Slf4j;
13 import org.apache.kafka.clients.consumer.Consumer;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.springframework.beans.factory.annotation.Autowired;
16 import org.springframework.boot.ApplicationArguments;
17 import org.springframework.boot.ApplicationRunner;
18 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
19 import org.springframework.context.ConfigurableApplicationContext;
20 import org.springframework.context.annotation.Bean;
21 import org.springframework.context.annotation.Configuration;
22 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
23 import org.springframework.util.concurrent.ListenableFuture;
24
25 import java.time.Clock;
26 import java.time.ZoneId;
27 import java.util.Optional;
28 import java.util.concurrent.CompletableFuture;
29
30
31 @ConditionalOnProperty(
32     prefix = "chat.backend",
33     name = "services",
34     havingValue = "kafka")
35 @Configuration
36 @Slf4j
37 public class KafkaServicesConfiguration implements ApplicationRunner
38 {
39   @Autowired
40   ThreadPoolTaskExecutor taskExecutor;
41   @Autowired
42   ConfigurableApplicationContext context;
43
44   @Autowired
45   ChatMessageChannel chatMessageChannel;
46
47   CompletableFuture<Optional<Exception>> chatRoomChannelConsumerJob;
48   CompletableFuture<Optional<Exception>> chatMessageChannelConsumerJob;
49
50
51   @Override
52   public void run(ApplicationArguments args) throws Exception
53   {
54     log.info("Starting the consumer for the ChatRoomChannel");
55     chatRoomChannelConsumerJob = taskExecutor.submitCompletable(chatMessageChannel);
56     chatRoomChannelConsumerJob.thenAccept(exceptionOptional ->
57     {
58       exceptionOptional.ifPresent();
59           log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus);
60           SpringApplication.exit(context, () -> exitStatus);
61         },
62         t ->
63         {
64           log.error("SimpleConsumer exited abnormally!", t);
65           SpringApplication.exit(context, () -> 2);
66         });
67   }
68
69   @PreDestroy
70   public void shutdown() throws ExecutionException, InterruptedException
71   {
72     log.info("Signaling SimpleConsumer to quit its work");
73     kafkaConsumer.wakeup();
74     log.info("Waiting for SimpleConsumer to finish its work");
75     consumerJob.get();
76     log.info("SimpleConsumer finished its work");
77   }
78
79
80   @Bean
81   ChatHome kafkaChatHome(
82       ShardingStrategy shardingStrategy,
83       ChatMessageChannel chatMessageChannel)
84   {
85     return new KafkaChatHome(shardingStrategy, chatMessageChannel);
86   }
87
88   @Bean
89   KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
90   {
91     return new KafkaChatRoomFactory(chatRoomChannel);
92   }
93
94   @Bean
95   ChatRoomChannel chatRoomChannel(
96       ChatBackendProperties properties,
97       Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
98       Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
99       ShardingStrategy shardingStrategy,
100       ChatMessageChannel chatMessageChannel,
101       Clock clock)
102   {
103     return new ChatRoomChannel(
104         properties.getKafka().getTopic(),
105         chatRoomChannelProducer,
106         chatRoomChannelConsumer,
107         shardingStrategy,
108         chatMessageChannel,
109         clock,
110         properties.getChatroomBufferSize());
111   }
112
113   @Bean
114   ChatMessageChannel chatMessageChannel(
115       ChatBackendProperties properties,
116       Producer<String, MessageTo> chatMessageChannelProducer,
117       Consumer<String, MessageTo> chatMessageChannelConsumer,
118       ZoneId zoneId)
119   {
120     return new ChatMessageChannel(
121         properties.getKafka().getTopic(),
122         chatMessageChannelProducer,
123         chatMessageChannelConsumer,
124         zoneId,
125         properties.getKafka().getNumPartitions());
126   }
127
128   @Bean
129   ShardingStrategy shardingStrategy(ChatBackendProperties properties)
130   {
131     return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
132   }
133
134   @Bean
135   ZoneId zoneId()
136   {
137     return ZoneId.systemDefault();
138   }
139 }