f78beb1074026828fdc10f4998693d070bbfd813
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
6 import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
9 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
10 import org.apache.kafka.clients.consumer.Consumer;
11 import org.apache.kafka.clients.consumer.ConsumerConfig;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.producer.KafkaProducer;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.apache.kafka.clients.producer.ProducerConfig;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 import org.apache.kafka.common.serialization.StringSerializer;
19 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
20 import org.springframework.context.annotation.Bean;
21 import org.springframework.context.annotation.Configuration;
22 import org.springframework.kafka.support.serializer.JsonDeserializer;
23 import org.springframework.kafka.support.serializer.JsonSerializer;
24 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
25
26 import java.net.InetSocketAddress;
27 import java.time.Clock;
28 import java.time.ZoneId;
29 import java.util.HashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Properties;
33
34
35 @ConditionalOnProperty(
36     prefix = "chat.backend",
37     name = "services",
38     havingValue = "kafka")
39 @Configuration
40 public class KafkaServicesConfiguration
41 {
42   @Bean
43   ConsumerTaskRunner consumerTaskRunner(
44       ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
45       ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
46       InfoChannel infoChannel)
47   {
48     return new ConsumerTaskRunner(
49         infoChannelConsumerTaskExecutor,
50         dataChannelConsumerTaskExecutor,
51         infoChannel);
52   }
53
54   @Bean
55   ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
56       ThreadPoolTaskExecutor taskExecutor,
57       InfoChannel infoChannel,
58       Consumer<String, AbstractMessageTo> infoChannelConsumer,
59       WorkAssignor infoChannelWorkAssignor)
60   {
61     return new ConsumerTaskExecutor(
62         taskExecutor,
63         infoChannel,
64         infoChannelConsumer,
65         infoChannelWorkAssignor);
66   }
67
68   @Bean
69   WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
70   {
71     return consumer ->
72     {
73       String topic = properties.getKafka().getInfoChannelTopic();
74       List<TopicPartition> partitions = consumer
75           .partitionsFor(topic)
76           .stream()
77           .map(partitionInfo ->
78               new TopicPartition(topic, partitionInfo.partition()))
79           .toList();
80       consumer.assign(partitions);
81     };
82   }
83
84   @Bean
85   ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
86       ThreadPoolTaskExecutor taskExecutor,
87       DataChannel dataChannel,
88       Consumer<String, AbstractMessageTo> dataChannelConsumer,
89       WorkAssignor dataChannelWorkAssignor)
90   {
91     return new ConsumerTaskExecutor(
92         taskExecutor,
93         dataChannel,
94         dataChannelConsumer,
95         dataChannelWorkAssignor);
96   }
97
98   @Bean
99   WorkAssignor dataChannelWorkAssignor(
100       ChatBackendProperties properties,
101       DataChannel dataChannel)
102   {
103     return consumer ->
104     {
105       List<String> topics =
106           List.of(properties.getKafka().getDataChannelTopic());
107       consumer.subscribe(topics, dataChannel);
108     };
109   }
110
111   @Bean
112   KafkaChatHomeService kafkaChatHome(
113       ChatBackendProperties properties,
114       InfoChannel infoChannel,
115       DataChannel dataChannel)
116   {
117     return new KafkaChatHomeService(
118         properties.getKafka().getNumPartitions(),
119         infoChannel,
120         dataChannel);
121   }
122
123   @Bean
124   InfoChannel infoChannel(
125       ChatBackendProperties properties,
126       Producer<String, AbstractMessageTo> producer,
127       Consumer<String, AbstractMessageTo> infoChannelConsumer,
128       ChannelMediator channelMediator)
129   {
130     InfoChannel infoChannel = new InfoChannel(
131         properties.getKafka().getInfoChannelTopic(),
132         producer,
133         infoChannelConsumer,
134         properties.getKafka().getPollingInterval(),
135         properties.getKafka().getNumPartitions(),
136         properties.getKafka().getInstanceUri());
137     channelMediator.setInfoChannel(infoChannel);
138     return infoChannel;
139   }
140
141   @Bean
142   DataChannel dataChannel(
143       ChatBackendProperties properties,
144       Producer<String, AbstractMessageTo> producer,
145       Consumer<String, AbstractMessageTo> dataChannelConsumer,
146       ZoneId zoneId,
147       Clock clock,
148       ChannelMediator channelMediator,
149       ShardingPublisherStrategy shardingPublisherStrategy)
150   {
151     return new DataChannel(
152         properties.getInstanceId(),
153         properties.getKafka().getDataChannelTopic(),
154         producer,
155         dataChannelConsumer,
156         zoneId,
157         properties.getKafka().getNumPartitions(),
158         properties.getKafka().getPollingInterval(),
159         properties.getChatroomBufferSize(),
160         clock,
161         channelMediator,
162         shardingPublisherStrategy);
163   }
164
165   @Bean
166   ChannelMediator channelMediator()
167   {
168     return new ChannelMediator();
169   }
170
171   @Bean
172   Producer<String, AbstractMessageTo>  producer(
173       Properties defaultProducerProperties,
174       ChatBackendProperties chatBackendProperties,
175       StringSerializer stringSerializer,
176       JsonSerializer<AbstractMessageTo> messageSerializer)
177   {
178     Map<String, Object> properties = new HashMap<>();
179     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
180     properties.put(
181         ProducerConfig.CLIENT_ID_CONFIG,
182         chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
183     return new KafkaProducer<>(
184         properties,
185         stringSerializer,
186         messageSerializer);
187   }
188
189   @Bean
190   StringSerializer stringSerializer()
191   {
192     return new StringSerializer();
193   }
194
195   @Bean
196   JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
197   {
198     JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
199     serializer.configure(
200         Map.of(
201             JsonSerializer.TYPE_MAPPINGS, typeMappings),
202         false);
203     return serializer;
204   }
205
206   @Bean
207   Consumer<String, AbstractMessageTo>  infoChannelConsumer(
208       Properties defaultConsumerProperties,
209       ChatBackendProperties chatBackendProperties,
210       StringDeserializer stringDeserializer,
211       JsonDeserializer<AbstractMessageTo> messageDeserializer)
212   {
213     Map<String, Object> properties = new HashMap<>();
214     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
215     properties.put(
216         ConsumerConfig.CLIENT_ID_CONFIG,
217         chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
218     properties.put(
219         ConsumerConfig.GROUP_ID_CONFIG,
220         "info_channel");
221     return new KafkaConsumer<>(
222         properties,
223         stringDeserializer,
224         messageDeserializer);
225   }
226
227   @Bean
228   Consumer<String, AbstractMessageTo>  dataChannelConsumer(
229       Properties defaultConsumerProperties,
230       ChatBackendProperties chatBackendProperties,
231       StringDeserializer stringDeserializer,
232       JsonDeserializer<AbstractMessageTo> messageDeserializer)
233   {
234     Map<String, Object> properties = new HashMap<>();
235     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
236     properties.put(
237         ConsumerConfig.CLIENT_ID_CONFIG,
238         chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
239     properties.put(
240         ConsumerConfig.GROUP_ID_CONFIG,
241         "data_channel");
242     return new KafkaConsumer<>(
243         properties,
244         stringDeserializer,
245         messageDeserializer);
246   }
247
248   @Bean
249   StringDeserializer stringDeserializer()
250   {
251     return new StringDeserializer();
252   }
253
254   @Bean
255   JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
256   {
257     JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
258     deserializer.configure(
259         Map.of(
260             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
261             JsonDeserializer.TYPE_MAPPINGS, typeMappings),
262         false );
263     return deserializer;
264   }
265
266   @Bean
267   String typeMappings ()
268   {
269     return
270         "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
271         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
272   }
273
274   @Bean
275   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
276   {
277     Properties properties = new Properties();
278     properties.setProperty(
279         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
280         chatBackendProperties.getKafka().getBootstrapServers());
281     return properties;
282   }
283
284   @Bean
285   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
286   {
287     Properties properties = new Properties();
288     properties.setProperty(
289         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
290         chatBackendProperties.getKafka().getBootstrapServers());
291     properties.setProperty(
292         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
293         "false");
294     properties.setProperty(
295         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
296         "earliest");
297     return properties;
298   }
299
300   @Bean
301   ShardingPublisherStrategy shardingPublisherStrategy(
302       ChatBackendProperties properties)
303   {
304     String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
305     InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
306     return new HaproxyShardingPublisherStrategy(
307         haproxyAddress,
308         properties.getKafka().getHaproxyMap(),
309         properties.getInstanceId());
310   }
311
312   @Bean
313   ZoneId zoneId()
314   {
315     return ZoneId.systemDefault();
316   }
317 }