refactor: Refined the creation of new `ChatRoom`s
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
5 import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
8 import de.juplo.kafka.chat.backend.domain.ChatHome;
9 import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
10 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
11 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
12 import org.springframework.context.annotation.Bean;
13 import org.springframework.context.annotation.Configuration;
14
15 import java.time.Clock;
16
17
18 @ConditionalOnProperty(
19     prefix = "chat.backend",
20     name = "services",
21     havingValue = "inmemory",
22     matchIfMissing = true)
23 @Configuration
24 public class InMemoryServicesConfiguration
25 {
26   @Bean
27   @ConditionalOnProperty(
28       prefix = "chat.backend.inmemory",
29       name = "sharding-strategy",
30       havingValue = "none",
31       matchIfMissing = true)
32   ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService)
33   {
34     return new SimpleChatHome(chatHomeService);
35   }
36
37   @Bean
38   @ConditionalOnProperty(
39       prefix = "chat.backend.inmemory",
40       name = "sharding-strategy",
41       havingValue = "kafkalike")
42   ChatHome kafkalikeShardingChatHome(
43       ChatBackendProperties properties,
44       InMemoryChatHomeService chatHomeService,
45       StorageStrategy storageStrategy)
46   {
47     int numShards = properties.getInmemory().getNumShards();
48     SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
49     storageStrategy
50         .read()
51         .subscribe(chatRoom ->
52         {
53           int shard = chatRoom.getShard();
54           if (chatHomes[shard] == null)
55             chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
56         });
57     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
58     return new ShardedChatHome(chatHomes, strategy);
59   }
60
61   @Bean
62   InMemoryChatHomeService chatHomeService(
63       ChatBackendProperties properties,
64       StorageStrategy storageStrategy)
65   {
66     ShardingStrategyType sharding =
67         properties.getInmemory().getShardingStrategy();
68     int numShards = sharding == ShardingStrategyType.none
69         ? 1
70         : properties.getInmemory().getNumShards();
71     int[] ownedShards = sharding == ShardingStrategyType.none
72         ? new int[] { 0 }
73         : properties.getInmemory().getOwnedShards();
74     return new InMemoryChatHomeService(
75         numShards,
76         ownedShards,
77         storageStrategy.read());
78   }
79
80   @Bean
81   InMemoryChatRoomFactory chatRoomFactory(
82       InMemoryChatHomeService service,
83       ShardingStrategy strategy,
84       Clock clock,
85       ChatBackendProperties properties)
86   {
87     return new InMemoryChatRoomFactory(
88         service,
89         strategy,
90         clock,
91         properties.getChatroomBufferSize());
92   }
93
94   @ConditionalOnProperty(
95       prefix = "chat.backend.inmemory",
96       name = "sharding-strategy",
97       havingValue = "none",
98       matchIfMissing = true)
99   @Bean
100   ShardingStrategy defaultShardingStrategy()
101   {
102     return chatRoomId -> 0;
103   }
104
105   @ConditionalOnProperty(
106       prefix = "chat.backend.inmemory",
107       name = "sharding-strategy",
108       havingValue = "kafkalike")
109   @Bean
110   ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
111   {
112     return new KafkaLikeShardingStrategy(
113         properties.getInmemory().getNumShards());
114   }
115 }