import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Clock;
+import java.time.ZoneId;
@ConditionalOnProperty(
prefix = "chat.backend",
name = "services",
- havingValue = "inmemory",
- matchIfMissing = true)
+ havingValue = "kafka")
@Configuration
public class KafkaServicesConfiguration implements ApplicationRunner
{
@Bean
ChatHome kafkaChatHome(
- ChatBackendProperties properties,
- InMemoryChatHomeService chatHomeService,
- StorageStrategy storageStrategy)
+ ShardingStrategy shardingStrategy,
+ ChatMessageChannel chatMessageChannel)
{
- int numShards = properties.getInmemory().getNumShards();
- SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
- ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
- return new ShardedChatHome(chatHomes, strategy);
+ return new KafkaChatHome(shardingStrategy, chatMessageChannel);
}
@Bean
- KafkaChatHomeService kafkaChatHomeService(ChatBackendProperties properties)
+ KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
{
- ShardingStrategyType sharding =
- properties.getInmemory().getShardingStrategy();
- int numShards = sharding == ShardingStrategyType.none
- ? 1
- : properties.getInmemory().getNumShards();
- int[] ownedShards = sharding == ShardingStrategyType.none
- ? new int[] { 0 }
- : properties.getInmemory().getOwnedShards();
- return new InMemoryChatHomeService(
- numShards,
- ownedShards,
- storageStrategy.read());
+ return new KafkaChatRoomFactory(chatRoomChannel);
}
@Bean
- InMemoryChatRoomFactory chatRoomFactory(
- InMemoryChatHomeService service,
- ShardingStrategy strategy,
- Clock clock,
- ChatBackendProperties properties)
+ ChatRoomChannel chatRoomChannel(
+ ChatBackendProperties properties,
+ Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
+ Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
+ ShardingStrategy shardingStrategy,
+ ChatMessageChannel chatMessageChannel,
+ Clock clock)
{
- return new InMemoryChatRoomFactory(
- service,
- strategy,
+ return new ChatRoomChannel(
+ properties.getKafka().getTopic(),
+ chatRoomChannelProducer,
+ chatRoomChannelConsumer,
+ shardingStrategy,
+ chatMessageChannel,
clock,
properties.getChatroomBufferSize());
}
@Bean
- ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
+ ChatMessageChannel chatMessageChannel(
+ ChatBackendProperties properties,
+ Producer<String, MessageTo> chatMessageChannelProducer,
+ Consumer<String, MessageTo> chatMessageChannelConsumer,
+ ZoneId zoneId)
{
- return new KafkaLikeShardingStrategy(
+ return new ChatMessageChannel(
+ properties.getKafka().getTopic(),
+ chatMessageChannelProducer,
+ chatMessageChannelConsumer,
+ zoneId,
properties.getKafka().getNumPartitions());
}
+
+ @Bean
+ ShardingStrategy shardingStrategy(ChatBackendProperties properties)
+ {
+ return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
+ }
+
+ @Bean
+ ZoneId zoneId()
+ {
+ return ZoneId.systemDefault();
+ }
}