1 package de.juplo.kafka.chat.backend.persistence.inmemory;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
5 import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
8 import de.juplo.kafka.chat.backend.domain.ChatHome;
9 import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
10 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
11 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
12 import org.springframework.context.annotation.Bean;
13 import org.springframework.context.annotation.Configuration;
15 import java.time.Clock;
16 import java.util.stream.IntStream;
19 @ConditionalOnProperty(
20 prefix = "chat.backend",
22 havingValue = "inmemory",
23 matchIfMissing = true)
25 public class InMemoryServicesConfiguration
28 @ConditionalOnProperty(
29 prefix = "chat.backend.inmemory",
30 name = "sharding-strategy",
32 matchIfMissing = true)
33 ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService)
35 return new SimpleChatHome(chatHomeService);
39 @ConditionalOnProperty(
40 prefix = "chat.backend.inmemory",
41 name = "sharding-strategy",
42 havingValue = "kafkalike")
43 ChatHome kafkalikeShardingChatHome(
44 ChatBackendProperties properties,
45 InMemoryChatHomeService chatHomeService)
47 int numShards = properties.getInmemory().getNumShards();
48 SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
50 .of(properties.getInmemory().getOwnedShards())
51 .forEach(shard -> chatHomes[shard] = new SimpleChatHome(chatHomeService, shard));
52 ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
53 return new ShardedChatHome(chatHomes, strategy);
57 InMemoryChatHomeService chatHomeService(
58 ChatBackendProperties properties,
59 StorageStrategy storageStrategy)
61 ShardingStrategyType sharding =
62 properties.getInmemory().getShardingStrategy();
63 int numShards = sharding == ShardingStrategyType.none
65 : properties.getInmemory().getNumShards();
66 int[] ownedShards = sharding == ShardingStrategyType.none
68 : properties.getInmemory().getOwnedShards();
69 return new InMemoryChatHomeService(
72 storageStrategy.read());
76 InMemoryChatRoomFactory chatRoomFactory(
77 InMemoryChatHomeService service,
78 ShardingStrategy strategy,
80 ChatBackendProperties properties)
82 return new InMemoryChatRoomFactory(
86 properties.getChatroomBufferSize());
89 @ConditionalOnProperty(
90 prefix = "chat.backend.inmemory",
91 name = "sharding-strategy",
93 matchIfMissing = true)
95 ShardingStrategy defaultShardingStrategy()
97 return chatRoomId -> 0;
100 @ConditionalOnProperty(
101 prefix = "chat.backend.inmemory",
102 name = "sharding-strategy",
103 havingValue = "kafkalike")
105 ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
107 return new KafkaLikeShardingStrategy(
108 properties.getInmemory().getNumShards());