1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
5 import de.juplo.kafka.chat.backend.domain.ChatHome;
6 import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
7 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
8 import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
9 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
10 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
11 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
12 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
13 import org.springframework.boot.ApplicationRunner;
14 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
15 import org.springframework.context.annotation.Bean;
16 import org.springframework.context.annotation.Configuration;
18 import java.time.Clock;
21 @ConditionalOnProperty(
22 prefix = "chat.backend",
24 havingValue = "inmemory",
25 matchIfMissing = true)
27 public class KafkaServicesConfiguration implements ApplicationRunner
30 ChatHome kafkaChatHome(
31 ChatBackendProperties properties,
32 InMemoryChatHomeService chatHomeService,
33 StorageStrategy storageStrategy)
35 int numShards = properties.getInmemory().getNumShards();
36 SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
37 ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
38 return new ShardedChatHome(chatHomes, strategy);
42 KafkaChatHomeService kafkaChatHomeService(ChatBackendProperties properties)
44 ShardingStrategyType sharding =
45 properties.getInmemory().getShardingStrategy();
46 int numShards = sharding == ShardingStrategyType.none
48 : properties.getInmemory().getNumShards();
49 int[] ownedShards = sharding == ShardingStrategyType.none
51 : properties.getInmemory().getOwnedShards();
52 return new InMemoryChatHomeService(
55 storageStrategy.read());
59 InMemoryChatRoomFactory chatRoomFactory(
60 InMemoryChatHomeService service,
61 ShardingStrategy strategy,
63 ChatBackendProperties properties)
65 return new InMemoryChatRoomFactory(
69 properties.getChatroomBufferSize());
73 ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
75 return new KafkaLikeShardingStrategy(
76 properties.getKafka().getNumPartitions());