89a3c11129035a91b86cc297e3b0b20b6e743650
[demos/kafka/chat] /
1 package de.juplo.kafka.chat.backend.implementation.inmemory;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
6 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
7 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
8 import org.springframework.context.annotation.Bean;
9 import org.springframework.context.annotation.Configuration;
10
11 import java.time.Clock;
12 import java.util.stream.IntStream;
13
14
15 @ConditionalOnProperty(
16     prefix = "chat.backend",
17     name = "services",
18     havingValue = "inmemory",
19     matchIfMissing = true)
20 @Configuration
21 public class InMemoryServicesConfiguration
22 {
23   @Bean
24   @ConditionalOnProperty(
25       prefix = "chat.backend.inmemory",
26       name = "sharding-strategy",
27       havingValue = "none",
28       matchIfMissing = true)
29   ChatHomeService noneShardingChatHome(
30       ChatBackendProperties properties,
31       StorageStrategy storageStrategy,
32       Clock clock)
33   {
34     return new SimpleChatHomeService(
35         storageStrategy,
36         clock,
37         properties.getChatroomBufferSize(),
38         properties.getInstanceUri());
39   }
40
41   @Bean
42   @ConditionalOnProperty(
43       prefix = "chat.backend.inmemory",
44       name = "sharding-strategy",
45       havingValue = "kafkalike")
46   ChatHomeService kafkalikeShardingChatHome(
47       ChatBackendProperties properties,
48       StorageStrategy storageStrategy,
49       Clock clock)
50   {
51     int numShards = properties.getInmemory().getNumShards();
52     SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards];
53     IntStream
54         .of(properties.getInmemory().getOwnedShards())
55         .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService(
56             shard,
57             storageStrategy,
58             clock,
59             properties.getChatroomBufferSize()));
60     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
61     return new ShardedChatHomeService(chatHomes, strategy);
62   }
63
64   @ConditionalOnProperty(
65       prefix = "chat.backend.inmemory",
66       name = "sharding-strategy",
67       havingValue = "none",
68       matchIfMissing = true)
69   @Bean
70   ShardingStrategy defaultShardingStrategy()
71   {
72     return chatRoomId -> 0;
73   }
74
75   @ConditionalOnProperty(
76       prefix = "chat.backend.inmemory",
77       name = "sharding-strategy",
78       havingValue = "kafkalike")
79   @Bean
80   ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
81   {
82     return new KafkaLikeShardingStrategy(
83         properties.getInmemory().getNumShards());
84   }
85 }