33371279b8c9de92b2ffa06615d128eed74491a5
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
6 import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
9 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
10 import org.apache.kafka.clients.consumer.Consumer;
11 import org.apache.kafka.clients.consumer.ConsumerConfig;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.producer.KafkaProducer;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.apache.kafka.clients.producer.ProducerConfig;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 import org.apache.kafka.common.serialization.StringSerializer;
19 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
20 import org.springframework.context.annotation.Bean;
21 import org.springframework.context.annotation.Configuration;
22 import org.springframework.kafka.support.serializer.JsonDeserializer;
23 import org.springframework.kafka.support.serializer.JsonSerializer;
24 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
25
26 import java.net.InetSocketAddress;
27 import java.time.Clock;
28 import java.time.ZoneId;
29 import java.util.HashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Properties;
33
34
35 @ConditionalOnProperty(
36     prefix = "chat.backend",
37     name = "services",
38     havingValue = "kafka")
39 @Configuration
40 public class KafkaServicesConfiguration
41 {
42   @Bean
43   ConsumerTaskRunner consumerTaskRunner(
44       ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
45       ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
46       InfoChannel infoChannel)
47   {
48     return new ConsumerTaskRunner(
49         infoChannelConsumerTaskExecutor,
50         dataChannelConsumerTaskExecutor,
51         infoChannel);
52   }
53
54   @Bean
55   ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
56       ThreadPoolTaskExecutor taskExecutor,
57       InfoChannel infoChannel,
58       Consumer<String, AbstractMessageTo> infoChannelConsumer,
59       WorkAssignor infoChannelWorkAssignor)
60   {
61     return new ConsumerTaskExecutor(
62         taskExecutor,
63         infoChannel,
64         infoChannelConsumer,
65         infoChannelWorkAssignor);
66   }
67
68   @Bean
69   WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
70   {
71     return consumer ->
72     {
73       String topic = properties.getKafka().getInfoChannelTopic();
74       List<TopicPartition> partitions = consumer
75           .partitionsFor(topic)
76           .stream()
77           .map(partitionInfo ->
78               new TopicPartition(topic, partitionInfo.partition()))
79           .toList();
80       consumer.assign(partitions);
81     };
82   }
83
84   @Bean
85   ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
86       ThreadPoolTaskExecutor taskExecutor,
87       DataChannel dataChannel,
88       Consumer<String, AbstractMessageTo> dataChannelConsumer,
89       WorkAssignor dataChannelWorkAssignor)
90   {
91     return new ConsumerTaskExecutor(
92         taskExecutor,
93         dataChannel,
94         dataChannelConsumer,
95         dataChannelWorkAssignor);
96   }
97
98   @Bean
99   WorkAssignor dataChannelWorkAssignor(
100       ChatBackendProperties properties,
101       DataChannel dataChannel)
102   {
103     return consumer ->
104     {
105       List<String> topics =
106           List.of(properties.getKafka().getDataChannelTopic());
107       consumer.subscribe(topics, dataChannel);
108     };
109   }
110
111   @Bean
112   KafkaChatHomeService kafkaChatHome(
113       ChatBackendProperties properties,
114       InfoChannel infoChannel,
115       DataChannel dataChannel)
116   {
117     return new KafkaChatHomeService(
118         properties.getKafka().getNumPartitions(),
119         infoChannel,
120         dataChannel);
121   }
122
123   @Bean
124   InfoChannel infoChannel(
125       ChatBackendProperties properties,
126       Producer<String, AbstractMessageTo> producer,
127       Consumer<String, AbstractMessageTo> infoChannelConsumer,
128       ChannelMediator channelMediator)
129   {
130     InfoChannel infoChannel = new InfoChannel(
131         properties.getKafka().getInfoChannelTopic(),
132         producer,
133         infoChannelConsumer,
134         properties.getKafka().getPollingInterval(),
135         properties.getKafka().getNumPartitions(),
136         properties.getKafka().getInstanceUri(),
137         channelMediator);
138     channelMediator.setInfoChannel(infoChannel);
139     return infoChannel;
140   }
141
142   @Bean
143   DataChannel dataChannel(
144       ChatBackendProperties properties,
145       Producer<String, AbstractMessageTo> producer,
146       Consumer<String, AbstractMessageTo> dataChannelConsumer,
147       ZoneId zoneId,
148       Clock clock,
149       ChannelMediator channelMediator,
150       ShardingPublisherStrategy shardingPublisherStrategy)
151   {
152     DataChannel dataChannel = new DataChannel(
153         properties.getInstanceId(),
154         properties.getKafka().getDataChannelTopic(),
155         producer,
156         dataChannelConsumer,
157         zoneId,
158         properties.getKafka().getNumPartitions(),
159         properties.getKafka().getPollingInterval(),
160         properties.getChatroomBufferSize(),
161         clock,
162         channelMediator,
163         shardingPublisherStrategy);
164     channelMediator.setDataChannel(dataChannel);
165     return dataChannel;
166   }
167
168   @Bean
169   ChannelMediator channelMediator()
170   {
171     return new ChannelMediator();
172   }
173
174   @Bean
175   Producer<String, AbstractMessageTo>  producer(
176       Properties defaultProducerProperties,
177       ChatBackendProperties chatBackendProperties,
178       StringSerializer stringSerializer,
179       JsonSerializer<AbstractMessageTo> messageSerializer)
180   {
181     Map<String, Object> properties = new HashMap<>();
182     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
183     properties.put(
184         ProducerConfig.CLIENT_ID_CONFIG,
185         chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
186     return new KafkaProducer<>(
187         properties,
188         stringSerializer,
189         messageSerializer);
190   }
191
192   @Bean
193   StringSerializer stringSerializer()
194   {
195     return new StringSerializer();
196   }
197
198   @Bean
199   JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
200   {
201     JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
202     serializer.configure(
203         Map.of(
204             JsonSerializer.TYPE_MAPPINGS, typeMappings),
205         false);
206     return serializer;
207   }
208
209   @Bean
210   Consumer<String, AbstractMessageTo>  infoChannelConsumer(
211       Properties defaultConsumerProperties,
212       ChatBackendProperties chatBackendProperties,
213       StringDeserializer stringDeserializer,
214       JsonDeserializer<AbstractMessageTo> messageDeserializer)
215   {
216     Map<String, Object> properties = new HashMap<>();
217     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
218     properties.put(
219         ConsumerConfig.CLIENT_ID_CONFIG,
220         chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
221     properties.put(
222         ConsumerConfig.GROUP_ID_CONFIG,
223         "info_channel");
224     return new KafkaConsumer<>(
225         properties,
226         stringDeserializer,
227         messageDeserializer);
228   }
229
230   @Bean
231   Consumer<String, AbstractMessageTo>  dataChannelConsumer(
232       Properties defaultConsumerProperties,
233       ChatBackendProperties chatBackendProperties,
234       StringDeserializer stringDeserializer,
235       JsonDeserializer<AbstractMessageTo> messageDeserializer)
236   {
237     Map<String, Object> properties = new HashMap<>();
238     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
239     properties.put(
240         ConsumerConfig.CLIENT_ID_CONFIG,
241         chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
242     properties.put(
243         ConsumerConfig.GROUP_ID_CONFIG,
244         "data_channel");
245     return new KafkaConsumer<>(
246         properties,
247         stringDeserializer,
248         messageDeserializer);
249   }
250
251   @Bean
252   StringDeserializer stringDeserializer()
253   {
254     return new StringDeserializer();
255   }
256
257   @Bean
258   JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
259   {
260     JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
261     deserializer.configure(
262         Map.of(
263             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
264             JsonDeserializer.TYPE_MAPPINGS, typeMappings),
265         false );
266     return deserializer;
267   }
268
269   @Bean
270   String typeMappings ()
271   {
272     return
273         "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
274         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
275   }
276
277   @Bean
278   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
279   {
280     Properties properties = new Properties();
281     properties.setProperty(
282         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
283         chatBackendProperties.getKafka().getBootstrapServers());
284     return properties;
285   }
286
287   @Bean
288   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
289   {
290     Properties properties = new Properties();
291     properties.setProperty(
292         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
293         chatBackendProperties.getKafka().getBootstrapServers());
294     properties.setProperty(
295         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
296         "false");
297     properties.setProperty(
298         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
299         "earliest");
300     return properties;
301   }
302
303   @Bean
304   ShardingPublisherStrategy shardingPublisherStrategy(
305       ChatBackendProperties properties)
306   {
307     String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
308     InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
309     return new HaproxyShardingPublisherStrategy(
310         haproxyAddress,
311         properties.getKafka().getHaproxyMap(),
312         properties.getInstanceId());
313   }
314
315   @Bean
316   ZoneId zoneId()
317   {
318     return ZoneId.systemDefault();
319   }
320 }