NEU
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
5 import de.juplo.kafka.chat.backend.domain.ChatHome;
6 import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
7 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
8 import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
9 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
10 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
11 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
12 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
13 import org.springframework.boot.ApplicationRunner;
14 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
15 import org.springframework.context.annotation.Bean;
16 import org.springframework.context.annotation.Configuration;
17
18 import java.time.Clock;
19
20
21 @ConditionalOnProperty(
22     prefix = "chat.backend",
23     name = "services",
24     havingValue = "kafka")
25 @Configuration
26 public class KafkaServicesConfiguration implements ApplicationRunner
27 {
28   @Bean
29   ChatHome kafkaChatHome(
30       ChatBackendProperties properties,
31       KafkaChatHomeService chatHomeService)
32   {
33     int numShards = properties.getInmemory().getNumShards();
34     SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
35     for (int shard = 0; shard < numShards; shard++)
36     {
37
38     }
39         .read()
40         .subscribe(chatRoom ->
41         {
42           int shard = chatRoom.getShard();
43           if (chatHomes[shard] == null)
44             chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
45         });
46     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
47     return new ShardedChatHome(chatHomes, strategy);
48   }
49
50   @Bean
51   KafkaChatHomeService kafkaChatHomeService(ChatBackendProperties properties)
52   {
53     ShardingStrategyType sharding =
54         properties.getInmemory().getShardingStrategy();
55     int numShards = sharding == ShardingStrategyType.none
56         ? 1
57         : properties.getInmemory().getNumShards();
58     int[] ownedShards = sharding == ShardingStrategyType.none
59         ? new int[] { 0 }
60         : properties.getInmemory().getOwnedShards();
61     return new InMemoryChatHomeService(
62         numShards,
63         ownedShards,
64         storageStrategy.read());
65   }
66
67   @Bean
68   InMemoryChatRoomFactory chatRoomFactory(
69       InMemoryChatHomeService service,
70       ShardingStrategy strategy,
71       Clock clock,
72       ChatBackendProperties properties)
73   {
74     return new InMemoryChatRoomFactory(
75         service,
76         strategy,
77         clock,
78         properties.getChatroomBufferSize());
79   }
80
81   @Bean
82   ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
83   {
84     return new KafkaLikeShardingStrategy(
85         properties.getKafka().getNumPartitions());
86   }
87 }