b28b6903cde961bcf8406edc399794f28d98a4d1
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
5 import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerConfig;
11 import org.apache.kafka.clients.consumer.KafkaConsumer;
12 import org.apache.kafka.clients.producer.KafkaProducer;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerConfig;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.serialization.StringDeserializer;
17 import org.apache.kafka.common.serialization.StringSerializer;
18 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
19 import org.springframework.context.annotation.Bean;
20 import org.springframework.context.annotation.Configuration;
21 import org.springframework.kafka.support.serializer.JsonDeserializer;
22 import org.springframework.kafka.support.serializer.JsonSerializer;
23 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
24
25 import java.net.InetSocketAddress;
26 import java.time.Clock;
27 import java.time.ZoneId;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Properties;
32
33
34 @ConditionalOnProperty(
35     prefix = "chat.backend",
36     name = "services",
37     havingValue = "kafka")
38 @Configuration
39 public class KafkaServicesConfiguration
40 {
41   @Bean
42   KafkaServicesThreadPoolTaskExecutorCustomizer kafkaServicesThreadPoolTaskExecutorCustomizer()
43   {
44     return new KafkaServicesThreadPoolTaskExecutorCustomizer();
45   }
46
47   @Bean
48   ChannelTaskRunner channelTaskRunner(
49       ChannelTaskExecutor infoChannelTaskExecutor,
50       ChannelTaskExecutor dataChannelTaskExecutor)
51   {
52     return new ChannelTaskRunner(
53         infoChannelTaskExecutor,
54         dataChannelTaskExecutor);
55   }
56
57   @Bean(destroyMethod = "join")
58   ChannelTaskExecutor infoChannelTaskExecutor(
59       ThreadPoolTaskExecutor taskExecutor,
60       InfoChannel infoChannel,
61       Consumer<String, AbstractMessageTo> infoChannelConsumer,
62       WorkAssignor infoChannelWorkAssignor)
63   {
64     return new ChannelTaskExecutor(
65         taskExecutor,
66         infoChannel,
67         infoChannelConsumer,
68         infoChannelWorkAssignor);
69   }
70
71   @Bean
72   WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
73   {
74     return consumer ->
75     {
76       String topic = properties.getKafka().getInfoChannelTopic();
77       List<TopicPartition> partitions = consumer
78           .partitionsFor(topic)
79           .stream()
80           .map(partitionInfo ->
81               new TopicPartition(topic, partitionInfo.partition()))
82           .toList();
83       consumer.assign(partitions);
84     };
85   }
86
87   @Bean(destroyMethod = "join")
88   ChannelTaskExecutor dataChannelTaskExecutor(
89       ThreadPoolTaskExecutor taskExecutor,
90       DataChannel dataChannel,
91       Consumer<String, AbstractMessageTo> dataChannelConsumer,
92       WorkAssignor dataChannelWorkAssignor)
93   {
94     return new ChannelTaskExecutor(
95         taskExecutor,
96         dataChannel,
97         dataChannelConsumer,
98         dataChannelWorkAssignor);
99   }
100
101   @Bean
102   WorkAssignor dataChannelWorkAssignor(
103       ChatBackendProperties properties,
104       DataChannel dataChannel)
105   {
106     return consumer ->
107     {
108       List<String> topics =
109           List.of(properties.getKafka().getDataChannelTopic());
110       consumer.subscribe(topics, dataChannel);
111     };
112   }
113
114   @Bean
115   KafkaChatHomeService kafkaChatHome(
116       ChatBackendProperties properties,
117       InfoChannel infoChannel,
118       DataChannel dataChannel)
119   {
120     return new KafkaChatHomeService(
121         properties.getKafka().getNumPartitions(),
122         infoChannel,
123         dataChannel);
124   }
125
126   @Bean
127   InfoChannel infoChannel(
128       ChatBackendProperties properties,
129       Producer<String, AbstractMessageTo> producer,
130       Consumer<String, AbstractMessageTo> infoChannelConsumer,
131       ChannelMediator channelMediator)
132   {
133     InfoChannel infoChannel = new InfoChannel(
134         properties.getKafka().getInfoChannelTopic(),
135         producer,
136         infoChannelConsumer,
137         properties.getKafka().getPollingInterval(),
138         properties.getKafka().getNumPartitions(),
139         properties.getKafka().getInstanceUri(),
140         channelMediator);
141     channelMediator.setInfoChannel(infoChannel);
142     return infoChannel;
143   }
144
145   @Bean
146   DataChannel dataChannel(
147       ChatBackendProperties properties,
148       Producer<String, AbstractMessageTo> producer,
149       Consumer<String, AbstractMessageTo> dataChannelConsumer,
150       ZoneId zoneId,
151       Clock clock,
152       ChannelMediator channelMediator,
153       ShardingPublisherStrategy shardingPublisherStrategy)
154   {
155     DataChannel dataChannel = new DataChannel(
156         properties.getInstanceId(),
157         properties.getKafka().getDataChannelTopic(),
158         producer,
159         dataChannelConsumer,
160         zoneId,
161         properties.getKafka().getNumPartitions(),
162         properties.getKafka().getPollingInterval(),
163         properties.getChatroomBufferSize(),
164         clock,
165         channelMediator,
166         shardingPublisherStrategy);
167     channelMediator.setDataChannel(dataChannel);
168     return dataChannel;
169   }
170
171   @Bean
172   ChannelMediator channelMediator()
173   {
174     return new ChannelMediator();
175   }
176
177   @Bean
178   Producer<String, AbstractMessageTo>  producer(
179       Properties defaultProducerProperties,
180       ChatBackendProperties chatBackendProperties,
181       StringSerializer stringSerializer,
182       JsonSerializer<AbstractMessageTo> messageSerializer)
183   {
184     Map<String, Object> properties = new HashMap<>();
185     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
186     properties.put(
187         ProducerConfig.CLIENT_ID_CONFIG,
188         chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
189     return new KafkaProducer<>(
190         properties,
191         stringSerializer,
192         messageSerializer);
193   }
194
195   @Bean
196   StringSerializer stringSerializer()
197   {
198     return new StringSerializer();
199   }
200
201   @Bean
202   JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
203   {
204     JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
205     serializer.configure(
206         Map.of(
207             JsonSerializer.TYPE_MAPPINGS, typeMappings),
208         false);
209     return serializer;
210   }
211
212   @Bean
213   Consumer<String, AbstractMessageTo>  infoChannelConsumer(
214       Properties defaultConsumerProperties,
215       ChatBackendProperties chatBackendProperties,
216       StringDeserializer stringDeserializer,
217       JsonDeserializer<AbstractMessageTo> messageDeserializer)
218   {
219     Map<String, Object> properties = new HashMap<>();
220     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
221     properties.put(
222         ConsumerConfig.CLIENT_ID_CONFIG,
223         chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
224     properties.put(
225         ConsumerConfig.GROUP_ID_CONFIG,
226         "info_channel");
227     return new KafkaConsumer<>(
228         properties,
229         stringDeserializer,
230         messageDeserializer);
231   }
232
233   @Bean
234   Consumer<String, AbstractMessageTo>  dataChannelConsumer(
235       Properties defaultConsumerProperties,
236       ChatBackendProperties chatBackendProperties,
237       StringDeserializer stringDeserializer,
238       JsonDeserializer<AbstractMessageTo> messageDeserializer)
239   {
240     Map<String, Object> properties = new HashMap<>();
241     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
242     properties.put(
243         ConsumerConfig.CLIENT_ID_CONFIG,
244         chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
245     properties.put(
246         ConsumerConfig.GROUP_ID_CONFIG,
247         "data_channel");
248     return new KafkaConsumer<>(
249         properties,
250         stringDeserializer,
251         messageDeserializer);
252   }
253
254   @Bean
255   StringDeserializer stringDeserializer()
256   {
257     return new StringDeserializer();
258   }
259
260   @Bean
261   JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
262   {
263     JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
264     deserializer.configure(
265         Map.of(
266             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
267             JsonDeserializer.TYPE_MAPPINGS, typeMappings),
268         false );
269     return deserializer;
270   }
271
272   @Bean
273   String typeMappings ()
274   {
275     return
276         "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
277         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
278   }
279
280   @Bean
281   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
282   {
283     Properties properties = new Properties();
284     properties.setProperty(
285         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
286         chatBackendProperties.getKafka().getBootstrapServers());
287     return properties;
288   }
289
290   @Bean
291   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
292   {
293     Properties properties = new Properties();
294     properties.setProperty(
295         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
296         chatBackendProperties.getKafka().getBootstrapServers());
297     properties.setProperty(
298         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
299         "false");
300     properties.setProperty(
301         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
302         "earliest");
303     return properties;
304   }
305
306   @Bean
307   ShardingPublisherStrategy shardingPublisherStrategy(
308       ChatBackendProperties properties)
309   {
310     String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
311     InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
312     return new HaproxyShardingPublisherStrategy(
313         haproxyAddress,
314         properties.getKafka().getHaproxyMap(),
315         properties.getInstanceId());
316   }
317
318   @Bean
319   ZoneId zoneId()
320   {
321     return ZoneId.systemDefault();
322   }
323
324   @Bean
325   ChannelReactiveHealthIndicator dataChannelHealthIndicator(
326       DataChannel dataChannel)
327   {
328     return new ChannelReactiveHealthIndicator(dataChannel);
329   }
330
331   @Bean
332   ChannelReactiveHealthIndicator infoChannelHealthIndicator(InfoChannel infoChannel)
333   {
334     return new ChannelReactiveHealthIndicator(infoChannel);
335   }
336 }