fd42d9df41c9809876f84cc3f1829aee106fe88c
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
5 import de.juplo.kafka.chat.backend.domain.ChatHome;
6 import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
7 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
8 import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
9 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
10 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
11 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
12 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
13 import org.springframework.boot.ApplicationRunner;
14 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
15 import org.springframework.context.annotation.Bean;
16 import org.springframework.context.annotation.Configuration;
17
18 import java.time.Clock;
19
20
21 @ConditionalOnProperty(
22     prefix = "chat.backend",
23     name = "services",
24     havingValue = "inmemory",
25     matchIfMissing = true)
26 @Configuration
27 public class KafkaServicesConfiguration implements ApplicationRunner
28 {
29   @Bean
30   ChatHome kafkaChatHome(
31       ChatBackendProperties properties,
32       InMemoryChatHomeService chatHomeService,
33       StorageStrategy storageStrategy)
34   {
35     int numShards = properties.getInmemory().getNumShards();
36     SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
37     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
38     return new ShardedChatHome(chatHomes, strategy);
39   }
40
41   @Bean
42   KafkaChatHomeService kafkaChatHomeService(ChatBackendProperties properties)
43   {
44     ShardingStrategyType sharding =
45         properties.getInmemory().getShardingStrategy();
46     int numShards = sharding == ShardingStrategyType.none
47         ? 1
48         : properties.getInmemory().getNumShards();
49     int[] ownedShards = sharding == ShardingStrategyType.none
50         ? new int[] { 0 }
51         : properties.getInmemory().getOwnedShards();
52     return new InMemoryChatHomeService(
53         numShards,
54         ownedShards,
55         storageStrategy.read());
56   }
57
58   @Bean
59   InMemoryChatRoomFactory chatRoomFactory(
60       InMemoryChatHomeService service,
61       ShardingStrategy strategy,
62       Clock clock,
63       ChatBackendProperties properties)
64   {
65     return new InMemoryChatRoomFactory(
66         service,
67         strategy,
68         clock,
69         properties.getChatroomBufferSize());
70   }
71
72   @Bean
73   ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
74   {
75     return new KafkaLikeShardingStrategy(
76         properties.getKafka().getNumPartitions());
77   }
78 }