feat: Reintroduced `ChatRoom.shard`, becaus it is needed as a routing-hint
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
5 import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
8 import de.juplo.kafka.chat.backend.domain.ChatHome;
9 import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
10 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
11 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
12 import org.springframework.context.annotation.Bean;
13 import org.springframework.context.annotation.Configuration;
14
15 import java.time.Clock;
16
17
18 @ConditionalOnProperty(
19     prefix = "chat.backend",
20     name = "services",
21     havingValue = "inmemory",
22     matchIfMissing = true)
23 @Configuration
24 public class InMemoryServicesConfiguration
25 {
26   @Bean
27   @ConditionalOnProperty(
28       prefix = "chat.backend.inmemory",
29       name = "sharding-strategy",
30       havingValue = "none",
31       matchIfMissing = true)
32   ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService)
33   {
34     return new SimpleChatHome(chatHomeService);
35   }
36
37   @Bean
38   @ConditionalOnProperty(
39       prefix = "chat.backend.inmemory",
40       name = "sharding-strategy",
41       havingValue = "kafkalike")
42   ChatHome kafkalikeShardingChatHome(
43       ChatBackendProperties properties,
44       InMemoryChatHomeService chatHomeService,
45       StorageStrategy storageStrategy)
46   {
47     int numShards = properties.getInmemory().getNumShards();
48     SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
49     storageStrategy
50         .read()
51         .subscribe(chatRoom ->
52         {
53           int shard = chatRoom.getShard();
54           if (chatHomes[shard] == null)
55             chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
56         });
57     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
58     return new ShardedChatHome(chatHomes, strategy);
59   }
60
61   @Bean
62   InMemoryChatHomeService chatHomeService(
63       ChatBackendProperties properties,
64       StorageStrategy storageStrategy)
65   {
66     ShardingStrategyType sharding =
67         properties.getInmemory().getShardingStrategy();
68     int numShards = sharding == ShardingStrategyType.none
69         ? 1
70         : properties.getInmemory().getNumShards();
71     int[] ownedShards = sharding == ShardingStrategyType.none
72         ? new int[] { 0 }
73         : properties.getInmemory().getOwnedShards();
74     return new InMemoryChatHomeService(
75         numShards,
76         ownedShards,
77         storageStrategy.read());
78   }
79
80   @Bean
81   InMemoryChatRoomFactory chatRoomFactory(
82       ShardingStrategy strategy,
83       Clock clock,
84       ChatBackendProperties properties)
85   {
86     return new InMemoryChatRoomFactory(
87         strategy,
88         clock,
89         properties.getChatroomBufferSize());
90   }
91
92   @ConditionalOnProperty(
93       prefix = "chat.backend.inmemory",
94       name = "sharding-strategy",
95       havingValue = "none",
96       matchIfMissing = true)
97   @Bean
98   ShardingStrategy defaultShardingStrategy()
99   {
100     return chatRoomId -> 0;
101   }
102
103   @ConditionalOnProperty(
104       prefix = "chat.backend.inmemory",
105       name = "sharding-strategy",
106       havingValue = "kafkalike")
107   @Bean
108   ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
109   {
110     return new KafkaLikeShardingStrategy(
111         properties.getInmemory().getNumShards());
112   }
113 }