refactor: Pushed sharding one layer down in the architecture
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
5 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
6 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
7 import de.juplo.kafka.chat.backend.domain.ChatHome;
8 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
9 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
10 import org.springframework.context.annotation.Bean;
11 import org.springframework.context.annotation.Configuration;
12
13 import java.time.Clock;
14
15
16 @ConditionalOnProperty(
17     prefix = "chat.backend",
18     name = "services",
19     havingValue = "inmemory",
20     matchIfMissing = true)
21 @Configuration
22 public class InMemoryServicesConfiguration
23 {
24   @Bean
25   ChatHome chatHome(InMemoryChatHomeService chatHomeService)
26   {
27     return new ChatHome(chatHomeService);
28   }
29
30   @Bean
31   InMemoryChatHomeService chatHomeService(
32       ChatBackendProperties properties,
33       StorageStrategy storageStrategy)
34   {
35     ShardingStrategyType sharding =
36         properties.getInmemory().getShardingStrategy();
37
38     int numShards;
39     int[] ownedShards;
40     ShardingStrategy shardingStrategy;
41
42     switch (sharding)
43     {
44       case none:
45         numShards = 1;
46         ownedShards = new int[] { 0 };
47         shardingStrategy = id -> 0;
48         break;
49       case kafkalike:
50         numShards = properties.getInmemory().getNumShards();
51         ownedShards = properties.getInmemory().getOwnedShards();
52         shardingStrategy = new KafkaLikeShardingStrategy(numShards);
53         break;
54       default:
55         throw new IllegalArgumentException("Unknown sharding strategy: " + sharding);
56     }
57
58     return new InMemoryChatHomeService(
59         numShards,
60         ownedShards,
61         shardingStrategy,
62         storageStrategy.read());
63   }
64
65   @Bean
66   InMemoryChatRoomFactory chatRoomFactory(
67       InMemoryChatHomeService service,
68       ShardingStrategy strategy,
69       Clock clock,
70       ChatBackendProperties properties)
71   {
72     return new InMemoryChatRoomFactory(
73         service,
74         strategy,
75         clock,
76         properties.getChatroomBufferSize());
77   }
78
79   @ConditionalOnProperty(
80       prefix = "chat.backend.inmemory",
81       name = "sharding-strategy",
82       havingValue = "none",
83       matchIfMissing = true)
84   @Bean
85   ShardingStrategy defaultShardingStrategy()
86   {
87     return chatRoomId -> 0;
88   }
89
90   @ConditionalOnProperty(
91       prefix = "chat.backend.inmemory",
92       name = "sharding-strategy",
93       havingValue = "kafkalike")
94   @Bean
95   ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
96   {
97     return new KafkaLikeShardingStrategy(
98         properties.getInmemory().getNumShards());
99   }
100 }