1dca04023df825ae14802943e13dc1ea548330e6
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
5 import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
8 import de.juplo.kafka.chat.backend.domain.ChatHome;
9 import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
10 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
11 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
12 import org.springframework.context.annotation.Bean;
13 import org.springframework.context.annotation.Configuration;
14
15 import java.time.Clock;
16
17
18 @ConditionalOnProperty(
19     prefix = "chat.backend",
20     name = "services",
21     havingValue = "inmemory",
22     matchIfMissing = true)
23 @Configuration
24 public class InMemoryServicesConfiguration
25 {
26   @Bean
27   @ConditionalOnProperty(
28       prefix = "chat.backend.inmemory",
29       name = "sharding-strategy",
30       havingValue = "none",
31       matchIfMissing = true)
32   ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService)
33   {
34     return new SimpleChatHome(chatHomeService);
35   }
36
37   @Bean
38   @ConditionalOnProperty(
39       prefix = "chat.backend.inmemory",
40       name = "sharding-strategy",
41       havingValue = "kafkalike")
42   ChatHome kafkalikeShardingChatHome(
43       ChatBackendProperties properties,
44       InMemoryChatHomeService chatHomeService,
45       StorageStrategy storageStrategy)
46   {
47     int numShards = properties.getInmemory().getNumShards();
48     ShardingStrategy shardingStrategy = new KafkaLikeShardingStrategy(numShards);
49     SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
50     storageStrategy
51         .read()
52         .subscribe(chatRoom ->
53         {
54           int shard = shardingStrategy.selectShard(chatRoom.getId());
55           if (chatHomes[shard] == null)
56             chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
57         });
58     return new ShardedChatHome(chatHomes, shardingStrategy);
59   }
60
61   @Bean
62   InMemoryChatHomeService chatHomeService(
63       ChatBackendProperties properties,
64       ShardingStrategy shardingStrategy,
65       StorageStrategy storageStrategy)
66   {
67     ShardingStrategyType sharding =
68         properties.getInmemory().getShardingStrategy();
69     int numShards = sharding == ShardingStrategyType.none
70         ? 1
71         : properties.getInmemory().getNumShards();
72     int[] ownedShards = sharding == ShardingStrategyType.none
73         ? new int[] { 0 }
74         : properties.getInmemory().getOwnedShards();
75     return new InMemoryChatHomeService(
76         shardingStrategy,
77         numShards,
78         ownedShards,
79         storageStrategy.read());
80   }
81
82   @Bean
83   InMemoryChatRoomFactory chatRoomFactory(
84       ShardingStrategy strategy,
85       Clock clock,
86       ChatBackendProperties properties)
87   {
88     return new InMemoryChatRoomFactory(
89         strategy,
90         clock,
91         properties.getChatroomBufferSize());
92   }
93
94   @ConditionalOnProperty(
95       prefix = "chat.backend.inmemory",
96       name = "sharding-strategy",
97       havingValue = "none",
98       matchIfMissing = true)
99   @Bean
100   ShardingStrategy defaultShardingStrategy()
101   {
102     return chatRoomId -> 0;
103   }
104
105   @ConditionalOnProperty(
106       prefix = "chat.backend.inmemory",
107       name = "sharding-strategy",
108       havingValue = "kafkalike")
109   @Bean
110   ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
111   {
112     return new KafkaLikeShardingStrategy(
113         properties.getInmemory().getNumShards());
114   }
115 }