525c427a51b2944193f17573217018d5527eeb20
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
5 import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerConfig;
11 import org.apache.kafka.clients.consumer.KafkaConsumer;
12 import org.apache.kafka.clients.producer.KafkaProducer;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerConfig;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.serialization.StringDeserializer;
17 import org.apache.kafka.common.serialization.StringSerializer;
18 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
19 import org.springframework.context.annotation.Bean;
20 import org.springframework.context.annotation.Configuration;
21 import org.springframework.kafka.support.serializer.JsonDeserializer;
22 import org.springframework.kafka.support.serializer.JsonSerializer;
23 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
24
25 import java.net.InetSocketAddress;
26 import java.time.Clock;
27 import java.time.ZoneId;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Properties;
32
33
34 @ConditionalOnProperty(
35     prefix = "chat.backend",
36     name = "services",
37     havingValue = "kafka")
38 @Configuration
39 public class KafkaServicesConfiguration
40 {
41   @Bean
42   ChannelTaskRunner channelTaskRunner(
43       ChannelTaskExecutor infoChannelTaskExecutor,
44       ChannelTaskExecutor dataChannelTaskExecutor)
45   {
46     return new ChannelTaskRunner(
47         infoChannelTaskExecutor,
48         dataChannelTaskExecutor);
49   }
50
51   @Bean
52   ChannelTaskExecutor infoChannelTaskExecutor(
53       ThreadPoolTaskExecutor taskExecutor,
54       InfoChannel infoChannel,
55       Consumer<String, AbstractMessageTo> infoChannelConsumer,
56       WorkAssignor infoChannelWorkAssignor)
57   {
58     return new ChannelTaskExecutor(
59         taskExecutor,
60         infoChannel,
61         infoChannelConsumer,
62         infoChannelWorkAssignor);
63   }
64
65   @Bean
66   WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
67   {
68     return consumer ->
69     {
70       String topic = properties.getKafka().getInfoChannelTopic();
71       List<TopicPartition> partitions = consumer
72           .partitionsFor(topic)
73           .stream()
74           .map(partitionInfo ->
75               new TopicPartition(topic, partitionInfo.partition()))
76           .toList();
77       consumer.assign(partitions);
78     };
79   }
80
81   @Bean
82   ChannelTaskExecutor dataChannelTaskExecutor(
83       ThreadPoolTaskExecutor taskExecutor,
84       DataChannel dataChannel,
85       Consumer<String, AbstractMessageTo> dataChannelConsumer,
86       WorkAssignor dataChannelWorkAssignor)
87   {
88     return new ChannelTaskExecutor(
89         taskExecutor,
90         dataChannel,
91         dataChannelConsumer,
92         dataChannelWorkAssignor);
93   }
94
95   @Bean
96   WorkAssignor dataChannelWorkAssignor(
97       ChatBackendProperties properties,
98       DataChannel dataChannel)
99   {
100     return consumer ->
101     {
102       List<String> topics =
103           List.of(properties.getKafka().getDataChannelTopic());
104       consumer.subscribe(topics, dataChannel);
105     };
106   }
107
108   @Bean
109   KafkaChatHomeService kafkaChatHome(
110       ChatBackendProperties properties,
111       InfoChannel infoChannel,
112       DataChannel dataChannel)
113   {
114     return new KafkaChatHomeService(
115         properties.getKafka().getNumPartitions(),
116         infoChannel,
117         dataChannel);
118   }
119
120   @Bean
121   InfoChannel infoChannel(
122       ChatBackendProperties properties,
123       Producer<String, AbstractMessageTo> producer,
124       Consumer<String, AbstractMessageTo> infoChannelConsumer,
125       ChannelMediator channelMediator)
126   {
127     InfoChannel infoChannel = new InfoChannel(
128         properties.getKafka().getInfoChannelTopic(),
129         producer,
130         infoChannelConsumer,
131         properties.getKafka().getPollingInterval(),
132         properties.getKafka().getNumPartitions(),
133         properties.getKafka().getInstanceUri(),
134         channelMediator);
135     channelMediator.setInfoChannel(infoChannel);
136     return infoChannel;
137   }
138
139   @Bean
140   DataChannel dataChannel(
141       ChatBackendProperties properties,
142       Producer<String, AbstractMessageTo> producer,
143       Consumer<String, AbstractMessageTo> dataChannelConsumer,
144       ZoneId zoneId,
145       Clock clock,
146       ChannelMediator channelMediator,
147       ShardingPublisherStrategy shardingPublisherStrategy)
148   {
149     DataChannel dataChannel = new DataChannel(
150         properties.getInstanceId(),
151         properties.getKafka().getDataChannelTopic(),
152         producer,
153         dataChannelConsumer,
154         zoneId,
155         properties.getKafka().getNumPartitions(),
156         properties.getKafka().getPollingInterval(),
157         properties.getChatroomBufferSize(),
158         clock,
159         channelMediator,
160         shardingPublisherStrategy);
161     channelMediator.setDataChannel(dataChannel);
162     return dataChannel;
163   }
164
165   @Bean
166   ChannelMediator channelMediator()
167   {
168     return new ChannelMediator();
169   }
170
171   @Bean
172   Producer<String, AbstractMessageTo>  producer(
173       Properties defaultProducerProperties,
174       ChatBackendProperties chatBackendProperties,
175       StringSerializer stringSerializer,
176       JsonSerializer<AbstractMessageTo> messageSerializer)
177   {
178     Map<String, Object> properties = new HashMap<>();
179     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
180     properties.put(
181         ProducerConfig.CLIENT_ID_CONFIG,
182         chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
183     return new KafkaProducer<>(
184         properties,
185         stringSerializer,
186         messageSerializer);
187   }
188
189   @Bean
190   StringSerializer stringSerializer()
191   {
192     return new StringSerializer();
193   }
194
195   @Bean
196   JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
197   {
198     JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
199     serializer.configure(
200         Map.of(
201             JsonSerializer.TYPE_MAPPINGS, typeMappings),
202         false);
203     return serializer;
204   }
205
206   @Bean
207   Consumer<String, AbstractMessageTo>  infoChannelConsumer(
208       Properties defaultConsumerProperties,
209       ChatBackendProperties chatBackendProperties,
210       StringDeserializer stringDeserializer,
211       JsonDeserializer<AbstractMessageTo> messageDeserializer)
212   {
213     Map<String, Object> properties = new HashMap<>();
214     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
215     properties.put(
216         ConsumerConfig.CLIENT_ID_CONFIG,
217         chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
218     properties.put(
219         ConsumerConfig.GROUP_ID_CONFIG,
220         "info_channel");
221     return new KafkaConsumer<>(
222         properties,
223         stringDeserializer,
224         messageDeserializer);
225   }
226
227   @Bean
228   Consumer<String, AbstractMessageTo>  dataChannelConsumer(
229       Properties defaultConsumerProperties,
230       ChatBackendProperties chatBackendProperties,
231       StringDeserializer stringDeserializer,
232       JsonDeserializer<AbstractMessageTo> messageDeserializer)
233   {
234     Map<String, Object> properties = new HashMap<>();
235     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
236     properties.put(
237         ConsumerConfig.CLIENT_ID_CONFIG,
238         chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
239     properties.put(
240         ConsumerConfig.GROUP_ID_CONFIG,
241         "data_channel");
242     return new KafkaConsumer<>(
243         properties,
244         stringDeserializer,
245         messageDeserializer);
246   }
247
248   @Bean
249   StringDeserializer stringDeserializer()
250   {
251     return new StringDeserializer();
252   }
253
254   @Bean
255   JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
256   {
257     JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
258     deserializer.configure(
259         Map.of(
260             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
261             JsonDeserializer.TYPE_MAPPINGS, typeMappings),
262         false );
263     return deserializer;
264   }
265
266   @Bean
267   String typeMappings ()
268   {
269     return
270         "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
271         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
272   }
273
274   @Bean
275   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
276   {
277     Properties properties = new Properties();
278     properties.setProperty(
279         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
280         chatBackendProperties.getKafka().getBootstrapServers());
281     return properties;
282   }
283
284   @Bean
285   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
286   {
287     Properties properties = new Properties();
288     properties.setProperty(
289         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
290         chatBackendProperties.getKafka().getBootstrapServers());
291     properties.setProperty(
292         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
293         "false");
294     properties.setProperty(
295         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
296         "earliest");
297     return properties;
298   }
299
300   @Bean
301   ShardingPublisherStrategy shardingPublisherStrategy(
302       ChatBackendProperties properties)
303   {
304     String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
305     InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
306     return new HaproxyShardingPublisherStrategy(
307         haproxyAddress,
308         properties.getKafka().getHaproxyMap(),
309         properties.getInstanceId());
310   }
311
312   @Bean
313   ZoneId zoneId()
314   {
315     return ZoneId.systemDefault();
316   }
317
318   @Bean
319   ChannelReactiveHealthIndicator dataChannelHealthIndicator(
320       DataChannel dataChannel)
321   {
322     return new ChannelReactiveHealthIndicator(dataChannel);
323   }
324
325   @Bean
326   ChannelReactiveHealthIndicator infoChannelHealthIndicator(InfoChannel infoChannel)
327   {
328     return new ChannelReactiveHealthIndicator(infoChannel);
329   }
330 }