1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
5 import de.juplo.kafka.chat.backend.domain.ChatHome;
6 import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
7 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
8 import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
9 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
10 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
11 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
12 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
13 import org.springframework.boot.ApplicationRunner;
14 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
15 import org.springframework.context.annotation.Bean;
16 import org.springframework.context.annotation.Configuration;
18 import java.time.Clock;
21 @ConditionalOnProperty(
22 prefix = "chat.backend",
24 havingValue = "kafka")
26 public class KafkaServicesConfiguration implements ApplicationRunner
29 ChatHome kafkaChatHome(
30 ChatBackendProperties properties,
31 KafkaChatHomeService chatHomeService)
33 int numShards = properties.getInmemory().getNumShards();
34 SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
35 for (int shard = 0; shard < numShards; shard++)
40 .subscribe(chatRoom ->
42 int shard = chatRoom.getShard();
43 if (chatHomes[shard] == null)
44 chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
46 ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
47 return new ShardedChatHome(chatHomes, strategy);
51 KafkaChatHomeService kafkaChatHomeService(ChatBackendProperties properties)
53 ShardingStrategyType sharding =
54 properties.getInmemory().getShardingStrategy();
55 int numShards = sharding == ShardingStrategyType.none
57 : properties.getInmemory().getNumShards();
58 int[] ownedShards = sharding == ShardingStrategyType.none
60 : properties.getInmemory().getOwnedShards();
61 return new InMemoryChatHomeService(
64 storageStrategy.read());
68 InMemoryChatRoomFactory chatRoomFactory(
69 InMemoryChatHomeService service,
70 ShardingStrategy strategy,
72 ChatBackendProperties properties)
74 return new InMemoryChatRoomFactory(
78 properties.getChatroomBufferSize());
82 ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
84 return new KafkaLikeShardingStrategy(
85 properties.getKafka().getNumPartitions());