@ConditionalOnProperty(
prefix = "chat.backend",
name = "services",
- havingValue = "inmemory",
- matchIfMissing = true)
+ havingValue = "kafka")
@Configuration
public class KafkaServicesConfiguration implements ApplicationRunner
{
@Bean
ChatHome kafkaChatHome(
ChatBackendProperties properties,
- InMemoryChatHomeService chatHomeService,
- StorageStrategy storageStrategy)
+ KafkaChatHomeService chatHomeService)
{
int numShards = properties.getInmemory().getNumShards();
SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
+ for (int shard = 0; shard < numShards; shard++)
+ {
+
+ }
+ .read()
+ .subscribe(chatRoom ->
+ {
+ int shard = chatRoom.getShard();
+ if (chatHomes[shard] == null)
+ chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
+ });
ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
return new ShardedChatHome(chatHomes, strategy);
}