1 package de.juplo.kafka.chat.backend.implementation.inmemory;
 
   3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 
   4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 
   5 import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
 
   6 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
 
   7 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 
   8 import org.springframework.context.annotation.Bean;
 
   9 import org.springframework.context.annotation.Configuration;
 
  11 import java.time.Clock;
 
  12 import java.util.stream.IntStream;
 
  15 @ConditionalOnProperty(
 
  16     prefix = "chat.backend",
 
  18     havingValue = "inmemory",
 
  19     matchIfMissing = true)
 
  21 public class InMemoryServicesConfiguration
 
  24   @ConditionalOnProperty(
 
  25       prefix = "chat.backend.inmemory",
 
  26       name = "sharding-strategy",
 
  28       matchIfMissing = true)
 
  29   SimpleChatHomeService noneShardingChatHome(
 
  30       ChatBackendProperties properties,
 
  31       StorageStrategy storageStrategy,
 
  34     SimpleChatHomeService chatHomeService = new SimpleChatHomeService(
 
  36         properties.getChatroomBufferSize());
 
  37     chatHomeService.restore(storageStrategy).block();
 
  38     return chatHomeService;
 
  42   @ConditionalOnProperty(
 
  43       prefix = "chat.backend.inmemory",
 
  44       name = "sharding-strategy",
 
  45       havingValue = "kafkalike")
 
  46   ShardedChatHomeService kafkalikeShardingChatHome(
 
  47       ChatBackendProperties properties,
 
  48       StorageStrategy storageStrategy,
 
  51     int numShards = properties.getInmemory().getNumShards();
 
  52     SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards];
 
  54         .of(properties.getInmemory().getOwnedShards())
 
  57           SimpleChatHomeService service = chatHomes[shard] = new SimpleChatHomeService(
 
  60               properties.getChatroomBufferSize());
 
  61           service.restore(storageStrategy).block();
 
  63     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
 
  64     return new ShardedChatHomeService(
 
  65         properties.getInstanceId(),
 
  67         properties.getInmemory().getShardOwners(),
 
  71   @ConditionalOnProperty(
 
  72       prefix = "chat.backend.inmemory",
 
  73       name = "sharding-strategy",
 
  75       matchIfMissing = true)
 
  77   ShardingStrategy defaultShardingStrategy()
 
  79     return chatRoomId -> 0;
 
  82   @ConditionalOnProperty(
 
  83       prefix = "chat.backend.inmemory",
 
  84       name = "sharding-strategy",
 
  85       havingValue = "kafkalike")
 
  87   ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
 
  89     return new KafkaLikeShardingStrategy(
 
  90         properties.getInmemory().getNumShards());