1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
5 import de.juplo.kafka.chat.backend.domain.ChatHome;
6 import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
7 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
8 import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
9 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
10 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
11 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
12 import org.apache.kafka.clients.consumer.Consumer;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.springframework.boot.ApplicationRunner;
15 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
16 import org.springframework.context.annotation.Bean;
17 import org.springframework.context.annotation.Configuration;
19 import java.time.Clock;
20 import java.time.ZoneId;
23 @ConditionalOnProperty(
24 prefix = "chat.backend",
26 havingValue = "kafka")
28 public class KafkaServicesConfiguration implements ApplicationRunner
31 ChatHome kafkaChatHome(
32 ShardingStrategy shardingStrategy,
33 ChatMessageChannel chatMessageChannel)
35 return new KafkaChatHome(shardingStrategy, chatMessageChannel);
39 KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
41 return new KafkaChatRoomFactory(chatRoomChannel);
45 ChatRoomChannel chatRoomChannel(
46 ChatBackendProperties properties,
47 Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
48 Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
49 ShardingStrategy shardingStrategy,
50 ChatMessageChannel chatMessageChannel,
53 return new ChatRoomChannel(
54 properties.getKafka().getTopic(),
55 chatRoomChannelProducer,
56 chatRoomChannelConsumer,
60 properties.getChatroomBufferSize());
64 ChatMessageChannel chatMessageChannel(
65 ChatBackendProperties properties,
66 Producer<String, MessageTo> chatMessageChannelProducer,
67 Consumer<String, MessageTo> chatMessageChannelConsumer,
70 return new ChatMessageChannel(
71 properties.getKafka().getTopic(),
72 chatMessageChannelProducer,
73 chatMessageChannelConsumer,
75 properties.getKafka().getNumPartitions());
79 ShardingStrategy shardingStrategy(ChatBackendProperties properties)
81 return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
87 return ZoneId.systemDefault();