518cf41b406729ae85a0b7205cd5464fd871c4a6
[demos/kafka/chat] /
1 package de.juplo.kafka.chat.backend.implementation.inmemory;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
6 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
7 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
8 import org.springframework.context.annotation.Bean;
9 import org.springframework.context.annotation.Configuration;
10
11 import java.time.Clock;
12 import java.util.stream.IntStream;
13
14
15 @ConditionalOnProperty(
16     prefix = "chat.backend",
17     name = "services",
18     havingValue = "inmemory",
19     matchIfMissing = true)
20 @Configuration
21 public class InMemoryServicesConfiguration
22 {
23   @Bean
24   @ConditionalOnProperty(
25       prefix = "chat.backend.inmemory",
26       name = "sharding-strategy",
27       havingValue = "none",
28       matchIfMissing = true)
29   ChatHomeService noneShardingChatHome(
30       ChatBackendProperties properties,
31       StorageStrategy storageStrategy,
32       Clock clock)
33   {
34     SimpleChatHomeService chatHomeService = new SimpleChatHomeService(
35         clock,
36         properties.getChatroomBufferSize());
37     chatHomeService.restore(storageStrategy).block();
38     return chatHomeService;
39   }
40
41   @Bean
42   @ConditionalOnProperty(
43       prefix = "chat.backend.inmemory",
44       name = "sharding-strategy",
45       havingValue = "kafkalike")
46   ChatHomeService kafkalikeShardingChatHome(
47       ChatBackendProperties properties,
48       StorageStrategy storageStrategy,
49       Clock clock)
50   {
51     int numShards = properties.getInmemory().getNumShards();
52     SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards];
53     IntStream
54         .of(properties.getInmemory().getOwnedShards())
55         .forEach(shard ->
56         {
57           SimpleChatHomeService service = chatHomes[shard] = new SimpleChatHomeService(
58               shard,
59               clock,
60               properties.getChatroomBufferSize());
61           service.restore(storageStrategy).block();
62         });
63     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
64     return new ShardedChatHomeService(
65         properties.getInstanceId(),
66         chatHomes,
67         properties.getInmemory().getShardOwners(),
68         strategy);
69   }
70
71   @ConditionalOnProperty(
72       prefix = "chat.backend.inmemory",
73       name = "sharding-strategy",
74       havingValue = "none",
75       matchIfMissing = true)
76   @Bean
77   ShardingStrategy defaultShardingStrategy()
78   {
79     return chatRoomId -> 0;
80   }
81
82   @ConditionalOnProperty(
83       prefix = "chat.backend.inmemory",
84       name = "sharding-strategy",
85       havingValue = "kafkalike")
86   @Bean
87   ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
88   {
89     return new KafkaLikeShardingStrategy(
90         properties.getInmemory().getNumShards());
91   }
92 }