WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
5 import de.juplo.kafka.chat.backend.domain.ChatHome;
6 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
7 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
8 import org.springframework.context.annotation.Bean;
9 import org.springframework.context.annotation.Configuration;
10
11 import java.time.Clock;
12 import java.util.stream.IntStream;
13
14
15 @ConditionalOnProperty(
16     prefix = "chat.backend",
17     name = "services",
18     havingValue = "inmemory",
19     matchIfMissing = true)
20 @Configuration
21 public class InMemoryServicesConfiguration
22 {
23   @Bean
24   @ConditionalOnProperty(
25       prefix = "chat.backend.inmemory",
26       name = "sharding-strategy",
27       havingValue = "none",
28       matchIfMissing = true)
29   ChatHome noneShardingChatHome()
30   {
31     return new SimpleChatHome();
32   }
33
34   @Bean
35   @ConditionalOnProperty(
36       prefix = "chat.backend.inmemory",
37       name = "sharding-strategy",
38       havingValue = "kafkalike")
39   ChatHome kafkalikeShardingChatHome(
40       ChatBackendProperties properties,
41       InMemoryChatHomeService chatHomeService)
42   {
43     int numShards = properties.getInmemory().getNumShards();
44     SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
45     IntStream
46         .of(properties.getInmemory().getOwnedShards())
47         .forEach(shard -> chatHomes[shard] = new SimpleChatHome(chatHomeService, shard));
48     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
49     return new ShardedChatHome(chatHomes, strategy);
50   }
51
52   @Bean
53   InMemoryChatHomeService chatHomeService(
54       ChatBackendProperties properties,
55       StorageStrategy storageStrategy)
56   {
57     ShardingStrategyType sharding =
58         properties.getInmemory().getShardingStrategy();
59     int numShards = sharding == ShardingStrategyType.none
60         ? 1
61         : properties.getInmemory().getNumShards();
62     int[] ownedShards = sharding == ShardingStrategyType.none
63         ? new int[] { 0 }
64         : properties.getInmemory().getOwnedShards();
65     return new InMemoryChatHomeService(
66         numShards,
67         ownedShards,
68         storageStrategy.read());
69   }
70
71   @Bean
72   InMemoryChatRoomFactory chatRoomFactory(
73       InMemoryChatHomeService service,
74       ShardingStrategy strategy,
75       Clock clock,
76       ChatBackendProperties properties)
77   {
78     return new InMemoryChatRoomFactory(
79         service,
80         strategy,
81         clock,
82         properties.getChatroomBufferSize());
83   }
84
85   @ConditionalOnProperty(
86       prefix = "chat.backend.inmemory",
87       name = "sharding-strategy",
88       havingValue = "none",
89       matchIfMissing = true)
90   @Bean
91   ShardingStrategy defaultShardingStrategy()
92   {
93     return chatRoomId -> 0;
94   }
95
96   @ConditionalOnProperty(
97       prefix = "chat.backend.inmemory",
98       name = "sharding-strategy",
99       havingValue = "kafkalike")
100   @Bean
101   ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
102   {
103     return new KafkaLikeShardingStrategy(
104         properties.getInmemory().getNumShards());
105   }
106 }