NEU
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHome;
5 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.apache.kafka.clients.consumer.ConsumerConfig;
9 import org.apache.kafka.clients.consumer.KafkaConsumer;
10 import org.apache.kafka.clients.producer.KafkaProducer;
11 import org.apache.kafka.clients.producer.Producer;
12 import org.apache.kafka.clients.producer.ProducerConfig;
13 import org.apache.kafka.common.serialization.IntegerDeserializer;
14 import org.apache.kafka.common.serialization.IntegerSerializer;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
22
23 import java.time.Clock;
24 import java.time.ZoneId;
25 import java.util.HashMap;
26 import java.util.Map;
27
28
29 @ConditionalOnProperty(
30     prefix = "chat.backend",
31     name = "services",
32     havingValue = "kafka")
33 @Configuration
34 public class KafkaServicesConfiguration
35 {
36   @Bean
37   ChatHome kafkaChatHome(
38       ShardingStrategy shardingStrategy,
39       ChatMessageChannel chatMessageChannel)
40   {
41     return new KafkaChatHome(shardingStrategy, chatMessageChannel);
42   }
43
44   @Bean
45   KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
46   {
47     return new KafkaChatRoomFactory(chatRoomChannel);
48   }
49
50   @Bean
51   ChatRoomChannel chatRoomChannel(
52       ChatBackendProperties properties,
53       Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
54       Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
55       ShardingStrategy shardingStrategy,
56       ChatMessageChannel chatMessageChannel,
57       Clock clock)
58   {
59     return new ChatRoomChannel(
60         properties.getKafka().getTopic(),
61         chatRoomChannelProducer,
62         chatRoomChannelConsumer,
63         shardingStrategy,
64         chatMessageChannel,
65         clock,
66         properties.getChatroomBufferSize());
67   }
68
69   @Bean
70   Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
71       Map<String, String> defaultProducerProperties,
72       IntegerSerializer integerSerializer,
73       JsonSerializer<ChatRoomTo> chatRoomSerializer)
74   {
75     return new KafkaProducer<>(
76         defaultProducerProperties,
77         integerSerializer,
78         chatRoomSerializer);
79   }
80
81   @Bean
82   IntegerSerializer integerSerializer()
83   {
84     return new IntegerSerializer();
85   }
86
87   @Bean
88   JsonSerializer<ChatRoomTo> chatRoomSerializer()
89   {
90     JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
91     return serializer;
92   }
93
94   @Bean
95   Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
96       Map<String, String> defaultConsumerProperties,
97       IntegerDeserializer integerDeserializer,
98       JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
99   {
100     Map<String, String> properties = new HashMap<>();
101     properties.put(
102         ConsumerConfig.GROUP_ID_CONFIG,
103         "chat_room_channel");
104     return new KafkaConsumer<>(
105         properties,
106         integerDeserializer,
107         chatRoomDeserializer);
108   }
109
110   @Bean
111   IntegerDeserializer integerDeserializer()
112   {
113     return new IntegerDeserializer();
114   }
115
116   @Bean
117   JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
118   {
119     JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
120     return deserializer;
121   }
122
123   @Bean
124   ShardingStrategy shardingStrategy(ChatBackendProperties properties)
125   {
126     return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
127   }
128
129   @Bean
130   ChatMessageChannel chatMessageChannel(
131       ChatBackendProperties properties,
132       Producer<String, MessageTo> chatMessageChannelProducer,
133       Consumer<String, MessageTo> chatMessageChannelConsumer,
134       ZoneId zoneId)
135   {
136     return new ChatMessageChannel(
137         properties.getKafka().getTopic(),
138         chatMessageChannelProducer,
139         chatMessageChannelConsumer,
140         zoneId,
141         properties.getKafka().getNumPartitions());
142   }
143
144   @Bean
145   Producer<String, MessageTo>  chatMessageChannelProducer(
146       Map<String, String> defaultProducerProperties,
147       StringSerializer stringSerializer,
148       JsonSerializer<MessageTo> messageSerializer)
149   {
150     return new KafkaProducer<>(
151         defaultProducerProperties,
152         stringSerializer,
153         messageSerializer);
154   }
155
156   @Bean
157   StringSerializer stringSerializer()
158   {
159     return new StringSerializer();
160   }
161
162   @Bean
163   JsonSerializer<MessageTo> chatMessageSerializer()
164   {
165     JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
166     return serializer;
167   }
168
169   @Bean
170   Consumer<String, MessageTo>  chatMessageChannelConsumer(
171       Map<String, String> defaultConsumerProperties,
172       StringDeserializer stringDeserializer,
173       JsonDeserializer<MessageTo> messageDeserializer)
174   {
175     Map<String, String> properties = new HashMap<>();
176     properties.put(
177         ConsumerConfig.GROUP_ID_CONFIG,
178         "chat_message_channel");
179     return new KafkaConsumer<>(
180         properties,
181         stringDeserializer,
182         messageDeserializer);
183   }
184
185   @Bean
186   StringDeserializer stringDeserializer()
187   {
188     return new StringDeserializer();
189   }
190
191   @Bean
192   JsonDeserializer<MessageTo> chatMessageDeserializer()
193   {
194     JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
195     return deserializer;
196   }
197
198   @Bean
199   Map<String, String> defaultProducerProperties(ChatBackendProperties chatBackendProperties)
200   {
201     return Map.of(
202         ProducerConfig.CLIENT_ID_CONFIG,
203         chatBackendProperties.getKafka().getClientId(),
204         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
205         chatBackendProperties.getKafka().getBootstrapServers());
206   }
207
208   @Bean
209   Map<String, String> defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
210   {
211     return Map.of(
212         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
213         chatBackendProperties.getKafka().getBootstrapServers(),
214         ConsumerConfig.CLIENT_ID_CONFIG,
215         chatBackendProperties.getKafka().getClientId(),
216         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
217         "false",
218         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
219         "earliest");
220   }
221
222   @Bean
223   ZoneId zoneId()
224   {
225     return ZoneId.systemDefault();
226   }
227 }