NEU
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
5 import de.juplo.kafka.chat.backend.domain.ChatHome;
6 import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
7 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
8 import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
9 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
10 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
11 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
12 import org.apache.kafka.clients.consumer.Consumer;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.springframework.boot.ApplicationRunner;
15 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
16 import org.springframework.context.annotation.Bean;
17 import org.springframework.context.annotation.Configuration;
18
19 import java.time.Clock;
20 import java.time.ZoneId;
21
22
23 @ConditionalOnProperty(
24     prefix = "chat.backend",
25     name = "services",
26     havingValue = "kafka")
27 @Configuration
28 public class KafkaServicesConfiguration implements ApplicationRunner
29 {
30   @Bean
31   ChatHome kafkaChatHome(
32       ShardingStrategy shardingStrategy,
33       ChatMessageChannel chatMessageChannel)
34   {
35     return new KafkaChatHome(shardingStrategy, chatMessageChannel);
36   }
37
38   @Bean
39   KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
40   {
41     return new KafkaChatRoomFactory(chatRoomChannel);
42   }
43
44   @Bean
45   ChatRoomChannel chatRoomChannel(
46       ChatBackendProperties properties,
47       Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
48       Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
49       ShardingStrategy shardingStrategy,
50       ChatMessageChannel chatMessageChannel,
51       Clock clock)
52   {
53     return new ChatRoomChannel(
54         properties.getKafka().getTopic(),
55         chatRoomChannelProducer,
56         chatRoomChannelConsumer,
57         shardingStrategy,
58         chatMessageChannel,
59         clock,
60         properties.getChatroomBufferSize());
61   }
62
63   @Bean
64   ChatMessageChannel chatMessageChannel(
65       ChatBackendProperties properties,
66       Producer<String, MessageTo> chatMessageChannelProducer,
67       Consumer<String, MessageTo> chatMessageChannelConsumer,
68       ZoneId zoneId)
69   {
70     return new ChatMessageChannel(
71         properties.getKafka().getTopic(),
72         chatMessageChannelProducer,
73         chatMessageChannelConsumer,
74         zoneId,
75         properties.getKafka().getNumPartitions());
76   }
77
78   @Bean
79   ShardingStrategy shardingStrategy(ChatBackendProperties properties)
80   {
81     return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
82   }
83
84   @Bean
85   ZoneId zoneId()
86   {
87     return ZoneId.systemDefault();
88   }
89 }