WIP:fix:activation
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
5 import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerConfig;
11 import org.apache.kafka.clients.consumer.KafkaConsumer;
12 import org.apache.kafka.clients.producer.KafkaProducer;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerConfig;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.serialization.StringDeserializer;
17 import org.apache.kafka.common.serialization.StringSerializer;
18 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
19 import org.springframework.context.annotation.Bean;
20 import org.springframework.context.annotation.Configuration;
21 import org.springframework.kafka.support.serializer.JsonDeserializer;
22 import org.springframework.kafka.support.serializer.JsonSerializer;
23 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
24
25 import java.net.InetSocketAddress;
26 import java.time.Clock;
27 import java.time.ZoneId;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Properties;
32
33
34 @ConditionalOnProperty(
35     prefix = "chat.backend",
36     name = "services",
37     havingValue = "kafka")
38 @Configuration
39 public class KafkaServicesConfiguration
40 {
41   @Bean
42   KafkaServicesThreadPoolTaskExecutorCustomizer kafkaServicesThreadPoolTaskExecutorCustomizer()
43   {
44     return new KafkaServicesThreadPoolTaskExecutorCustomizer();
45   }
46
47   @Bean(initMethod = "executeChannelTask", destroyMethod = "join")
48   ChannelTaskExecutor infoChannelTaskExecutor(
49       ThreadPoolTaskExecutor taskExecutor,
50       InfoChannel infoChannel,
51       Consumer<String, AbstractMessageTo> infoChannelConsumer,
52       WorkAssignor infoChannelWorkAssignor)
53   {
54     return new ChannelTaskExecutor(
55         taskExecutor,
56         infoChannel,
57         infoChannelConsumer,
58         infoChannelWorkAssignor);
59   }
60
61   @Bean
62   WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
63   {
64     return consumer ->
65     {
66       String topic = properties.getKafka().getInfoChannelTopic();
67       List<TopicPartition> partitions = consumer
68           .partitionsFor(topic)
69           .stream()
70           .map(partitionInfo ->
71               new TopicPartition(topic, partitionInfo.partition()))
72           .toList();
73       consumer.assign(partitions);
74     };
75   }
76
77   @Bean(initMethod = "executeChannelTask", destroyMethod = "join")
78   ChannelTaskExecutor dataChannelTaskExecutor(
79       ThreadPoolTaskExecutor taskExecutor,
80       DataChannel dataChannel,
81       Consumer<String, AbstractMessageTo> dataChannelConsumer,
82       WorkAssignor dataChannelWorkAssignor)
83   {
84     return new ChannelTaskExecutor(
85         taskExecutor,
86         dataChannel,
87         dataChannelConsumer,
88         dataChannelWorkAssignor);
89   }
90
91   @Bean
92   WorkAssignor dataChannelWorkAssignor(
93       ChatBackendProperties properties,
94       DataChannel dataChannel)
95   {
96     return consumer ->
97     {
98       List<String> topics =
99           List.of(properties.getKafka().getDataChannelTopic());
100       consumer.subscribe(topics, dataChannel);
101     };
102   }
103
104   @Bean
105   KafkaChatHomeService kafkaChatHome(
106       ChatBackendProperties properties,
107       InfoChannel infoChannel,
108       DataChannel dataChannel)
109   {
110     return new KafkaChatHomeService(
111         properties.getKafka().getNumPartitions(),
112         infoChannel,
113         dataChannel);
114   }
115
116   @Bean
117   InfoChannel infoChannel(
118       ChatBackendProperties properties,
119       Producer<String, AbstractMessageTo> producer,
120       Consumer<String, AbstractMessageTo> infoChannelConsumer,
121       ChannelMediator channelMediator)
122   {
123     InfoChannel infoChannel = new InfoChannel(
124         properties.getKafka().getInfoChannelTopic(),
125         producer,
126         infoChannelConsumer,
127         properties.getKafka().getPollingInterval(),
128         properties.getKafka().getNumPartitions(),
129         properties.getKafka().getInstanceUri(),
130         channelMediator);
131     channelMediator.setInfoChannel(infoChannel);
132     return infoChannel;
133   }
134
135   @Bean
136   DataChannel dataChannel(
137       ChatBackendProperties properties,
138       Producer<String, AbstractMessageTo> producer,
139       Consumer<String, AbstractMessageTo> dataChannelConsumer,
140       ZoneId zoneId,
141       Clock clock,
142       ChannelMediator channelMediator,
143       ShardingPublisherStrategy shardingPublisherStrategy)
144   {
145     DataChannel dataChannel = new DataChannel(
146         properties.getInstanceId(),
147         properties.getKafka().getDataChannelTopic(),
148         producer,
149         dataChannelConsumer,
150         zoneId,
151         properties.getKafka().getNumPartitions(),
152         properties.getKafka().getPollingInterval(),
153         properties.getChatroomHistoryLimit(),
154         clock,
155         channelMediator,
156         shardingPublisherStrategy);
157     channelMediator.setDataChannel(dataChannel);
158     return dataChannel;
159   }
160
161   @Bean
162   ChannelMediator channelMediator()
163   {
164     return new ChannelMediator();
165   }
166
167   @Bean
168   Producer<String, AbstractMessageTo>  producer(
169       Properties defaultProducerProperties,
170       ChatBackendProperties chatBackendProperties,
171       StringSerializer stringSerializer,
172       JsonSerializer<AbstractMessageTo> messageSerializer)
173   {
174     Map<String, Object> properties = new HashMap<>();
175     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
176     properties.put(
177         ProducerConfig.CLIENT_ID_CONFIG,
178         chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
179     return new KafkaProducer<>(
180         properties,
181         stringSerializer,
182         messageSerializer);
183   }
184
185   @Bean
186   StringSerializer stringSerializer()
187   {
188     return new StringSerializer();
189   }
190
191   @Bean
192   JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
193   {
194     JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
195     serializer.configure(
196         Map.of(
197             JsonSerializer.TYPE_MAPPINGS, typeMappings),
198         false);
199     return serializer;
200   }
201
202   @Bean
203   Consumer<String, AbstractMessageTo>  infoChannelConsumer(
204       Properties defaultConsumerProperties,
205       ChatBackendProperties chatBackendProperties,
206       StringDeserializer stringDeserializer,
207       JsonDeserializer<AbstractMessageTo> messageDeserializer)
208   {
209     Map<String, Object> properties = new HashMap<>();
210     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
211     properties.put(
212         ConsumerConfig.CLIENT_ID_CONFIG,
213         chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
214     properties.put(
215         ConsumerConfig.GROUP_ID_CONFIG,
216         "info_channel");
217     return new KafkaConsumer<>(
218         properties,
219         stringDeserializer,
220         messageDeserializer);
221   }
222
223   @Bean
224   Consumer<String, AbstractMessageTo>  dataChannelConsumer(
225       Properties defaultConsumerProperties,
226       ChatBackendProperties chatBackendProperties,
227       StringDeserializer stringDeserializer,
228       JsonDeserializer<AbstractMessageTo> messageDeserializer)
229   {
230     Map<String, Object> properties = new HashMap<>();
231     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
232     properties.put(
233         ConsumerConfig.CLIENT_ID_CONFIG,
234         chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
235     properties.put(
236         ConsumerConfig.GROUP_ID_CONFIG,
237         "data_channel");
238     return new KafkaConsumer<>(
239         properties,
240         stringDeserializer,
241         messageDeserializer);
242   }
243
244   @Bean
245   StringDeserializer stringDeserializer()
246   {
247     return new StringDeserializer();
248   }
249
250   @Bean
251   JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
252   {
253     JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
254     deserializer.configure(
255         Map.of(
256             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
257             JsonDeserializer.TYPE_MAPPINGS, typeMappings),
258         false );
259     return deserializer;
260   }
261
262   @Bean
263   String typeMappings ()
264   {
265     return
266         "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
267         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
268   }
269
270   @Bean
271   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
272   {
273     Properties properties = new Properties();
274     properties.setProperty(
275         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
276         chatBackendProperties.getKafka().getBootstrapServers());
277     return properties;
278   }
279
280   @Bean
281   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
282   {
283     Properties properties = new Properties();
284     properties.setProperty(
285         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
286         chatBackendProperties.getKafka().getBootstrapServers());
287     properties.setProperty(
288         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
289         "false");
290     properties.setProperty(
291         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
292         "earliest");
293     return properties;
294   }
295
296   @Bean
297   ShardingPublisherStrategy shardingPublisherStrategy(
298       ChatBackendProperties properties)
299   {
300     String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
301     InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
302     return new HaproxyShardingPublisherStrategy(
303         haproxyAddress,
304         properties.getKafka().getHaproxyMap(),
305         properties.getInstanceId());
306   }
307
308   @Bean
309   ZoneId zoneId()
310   {
311     return ZoneId.systemDefault();
312   }
313
314   @Bean
315   ChannelReactiveHealthIndicator dataChannelHealthIndicator(
316       DataChannel dataChannel)
317   {
318     return new ChannelReactiveHealthIndicator(dataChannel);
319   }
320
321   @Bean
322   ChannelReactiveHealthIndicator infoChannelHealthIndicator(InfoChannel infoChannel)
323   {
324     return new ChannelReactiveHealthIndicator(infoChannel);
325   }
326 }