5b5785ea54bb2bc79c730cf357aaa5c3fbc886f6
[demos/kafka/chat] /
1 package de.juplo.kafka.chat.backend.implementation.inmemory;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
6 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
7 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
8 import org.springframework.context.annotation.Bean;
9 import org.springframework.context.annotation.Configuration;
10
11 import java.time.Clock;
12 import java.util.stream.IntStream;
13
14
15 @ConditionalOnProperty(
16     prefix = "chat.backend",
17     name = "services",
18     havingValue = "inmemory",
19     matchIfMissing = true)
20 @Configuration
21 public class InMemoryServicesConfiguration
22 {
23   @Bean
24   @ConditionalOnProperty(
25       prefix = "chat.backend.inmemory",
26       name = "sharding-strategy",
27       havingValue = "none",
28       matchIfMissing = true)
29   ChatHomeService noneShardingChatHome(
30       ChatBackendProperties properties,
31       StorageStrategy storageStrategy,
32       Clock clock)
33   {
34     return new SimpleChatHomeService(
35         storageStrategy,
36         clock,
37         properties.getChatroomBufferSize());
38   }
39
40   @Bean
41   @ConditionalOnProperty(
42       prefix = "chat.backend.inmemory",
43       name = "sharding-strategy",
44       havingValue = "kafkalike")
45   ChatHomeService kafkalikeShardingChatHome(
46       ChatBackendProperties properties,
47       StorageStrategy storageStrategy,
48       Clock clock)
49   {
50     int numShards = properties.getInmemory().getNumShards();
51     SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards];
52     IntStream
53         .of(properties.getInmemory().getOwnedShards())
54         .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService(
55             shard,
56             storageStrategy,
57             clock,
58             properties.getChatroomBufferSize()));
59     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
60     return new ShardedChatHomeService(
61         properties.getInstanceId(),
62         chatHomes,
63         properties.getInmemory().getShardOwners(),
64         strategy);
65   }
66
67   @ConditionalOnProperty(
68       prefix = "chat.backend.inmemory",
69       name = "sharding-strategy",
70       havingValue = "none",
71       matchIfMissing = true)
72   @Bean
73   ShardingStrategy defaultShardingStrategy()
74   {
75     return chatRoomId -> 0;
76   }
77
78   @ConditionalOnProperty(
79       prefix = "chat.backend.inmemory",
80       name = "sharding-strategy",
81       havingValue = "kafkalike")
82   @Bean
83   ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
84   {
85     return new KafkaLikeShardingStrategy(
86         properties.getInmemory().getNumShards());
87   }
88 }