WIP:haproxy
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
6 import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
9 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
10 import org.apache.kafka.clients.consumer.Consumer;
11 import org.apache.kafka.clients.consumer.ConsumerConfig;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.producer.KafkaProducer;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.apache.kafka.clients.producer.ProducerConfig;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 import org.apache.kafka.common.serialization.StringSerializer;
19 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
20 import org.springframework.context.annotation.Bean;
21 import org.springframework.context.annotation.Configuration;
22 import org.springframework.kafka.support.serializer.JsonDeserializer;
23 import org.springframework.kafka.support.serializer.JsonSerializer;
24 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
25
26 import java.net.InetSocketAddress;
27 import java.time.Clock;
28 import java.time.ZoneId;
29 import java.util.HashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Properties;
33
34
35 @ConditionalOnProperty(
36     prefix = "chat.backend",
37     name = "services",
38     havingValue = "kafka")
39 @Configuration
40 public class KafkaServicesConfiguration
41 {
42   @Bean
43   ConsumerTaskRunner consumerTaskRunner(
44       ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
45       ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
46       InfoChannel infoChannel)
47   {
48     return new ConsumerTaskRunner(
49         infoChannelConsumerTaskExecutor,
50         dataChannelConsumerTaskExecutor,
51         infoChannel);
52   }
53
54   @Bean
55   ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
56       ThreadPoolTaskExecutor taskExecutor,
57       InfoChannel infoChannel,
58       Consumer<String, AbstractMessageTo> infoChannelConsumer,
59       WorkAssignor infoChannelWorkAssignor)
60   {
61     return new ConsumerTaskExecutor(
62         taskExecutor,
63         infoChannel,
64         infoChannelConsumer,
65         infoChannelWorkAssignor);
66   }
67
68   @Bean
69   WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
70   {
71     return consumer ->
72     {
73       String topic = properties.getKafka().getInfoChannelTopic();
74       List<TopicPartition> partitions = consumer
75           .partitionsFor(topic)
76           .stream()
77           .map(partitionInfo ->
78               new TopicPartition(topic, partitionInfo.partition()))
79           .toList();
80       consumer.assign(partitions);
81     };
82   }
83
84   @Bean
85   ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
86       ThreadPoolTaskExecutor taskExecutor,
87       DataChannel dataChannel,
88       Consumer<String, AbstractMessageTo> dataChannelConsumer,
89       WorkAssignor dataChannelWorkAssignor)
90   {
91     return new ConsumerTaskExecutor(
92         taskExecutor,
93         dataChannel,
94         dataChannelConsumer,
95         dataChannelWorkAssignor);
96   }
97
98   @Bean
99   WorkAssignor dataChannelWorkAssignor(
100       ChatBackendProperties properties,
101       DataChannel dataChannel)
102   {
103     return consumer ->
104     {
105       List<String> topics =
106           List.of(properties.getKafka().getDataChannelTopic());
107       consumer.subscribe(topics, dataChannel);
108     };
109   }
110
111   @Bean
112     ChatHomeService kafkaChatHome(
113       ChatBackendProperties properties,
114       InfoChannel infoChannel,
115       DataChannel dataChannel)
116   {
117     return new KafkaChatHomeService(
118         properties.getKafka().getNumPartitions(),
119         infoChannel,
120         dataChannel);
121   }
122
123   @Bean
124   InfoChannel infoChannel(
125       ChatBackendProperties properties,
126       Producer<String, AbstractMessageTo> producer,
127       Consumer<String, AbstractMessageTo> infoChannelConsumer)
128   {
129     return new InfoChannel(
130         properties.getKafka().getInfoChannelTopic(),
131         producer,
132         infoChannelConsumer,
133         properties.getKafka().getInstanceUri());
134   }
135
136   @Bean
137   DataChannel dataChannel(
138       ChatBackendProperties properties,
139       Producer<String, AbstractMessageTo> producer,
140       Consumer<String, AbstractMessageTo> dataChannelConsumer,
141       ZoneId zoneId,
142       Clock clock,
143       InfoChannel infoChannel,
144       ShardingPublisherStrategy shardingPublisherStrategy)
145   {
146     return new DataChannel(
147         properties.getInstanceId(),
148         properties.getKafka().getDataChannelTopic(),
149         producer,
150         dataChannelConsumer,
151         zoneId,
152         properties.getKafka().getNumPartitions(),
153         properties.getChatroomBufferSize(),
154         clock,
155         infoChannel,
156         shardingPublisherStrategy);
157   }
158
159   @Bean
160   Producer<String, AbstractMessageTo>  producer(
161       Properties defaultProducerProperties,
162       ChatBackendProperties chatBackendProperties,
163       StringSerializer stringSerializer,
164       JsonSerializer<AbstractMessageTo> messageSerializer)
165   {
166     Map<String, Object> properties = new HashMap<>();
167     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
168     properties.put(
169         ProducerConfig.CLIENT_ID_CONFIG,
170         chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
171     return new KafkaProducer<>(
172         properties,
173         stringSerializer,
174         messageSerializer);
175   }
176
177   @Bean
178   StringSerializer stringSerializer()
179   {
180     return new StringSerializer();
181   }
182
183   @Bean
184   JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
185   {
186     JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
187     serializer.configure(
188         Map.of(
189             JsonSerializer.TYPE_MAPPINGS, typeMappings),
190         false);
191     return serializer;
192   }
193
194   @Bean
195   Consumer<String, AbstractMessageTo>  infoChannelConsumer(
196       Properties defaultConsumerProperties,
197       ChatBackendProperties chatBackendProperties,
198       StringDeserializer stringDeserializer,
199       JsonDeserializer<AbstractMessageTo> messageDeserializer)
200   {
201     Map<String, Object> properties = new HashMap<>();
202     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
203     properties.put(
204         ConsumerConfig.CLIENT_ID_CONFIG,
205         chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
206     properties.put(
207         ConsumerConfig.GROUP_ID_CONFIG,
208         "info_channel");
209     return new KafkaConsumer<>(
210         properties,
211         stringDeserializer,
212         messageDeserializer);
213   }
214
215   @Bean
216   Consumer<String, AbstractMessageTo>  dataChannelConsumer(
217       Properties defaultConsumerProperties,
218       ChatBackendProperties chatBackendProperties,
219       StringDeserializer stringDeserializer,
220       JsonDeserializer<AbstractMessageTo> messageDeserializer)
221   {
222     Map<String, Object> properties = new HashMap<>();
223     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
224     properties.put(
225         ConsumerConfig.CLIENT_ID_CONFIG,
226         chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
227     properties.put(
228         ConsumerConfig.GROUP_ID_CONFIG,
229         "data_channel");
230     return new KafkaConsumer<>(
231         properties,
232         stringDeserializer,
233         messageDeserializer);
234   }
235
236   @Bean
237   StringDeserializer stringDeserializer()
238   {
239     return new StringDeserializer();
240   }
241
242   @Bean
243   JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
244   {
245     JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
246     deserializer.configure(
247         Map.of(
248             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
249             JsonDeserializer.TYPE_MAPPINGS, typeMappings),
250         false );
251     return deserializer;
252   }
253
254   @Bean
255   String typeMappings ()
256   {
257     return
258         "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
259         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
260   }
261
262   @Bean
263   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
264   {
265     Properties properties = new Properties();
266     properties.setProperty(
267         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
268         chatBackendProperties.getKafka().getBootstrapServers());
269     return properties;
270   }
271
272   @Bean
273   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
274   {
275     Properties properties = new Properties();
276     properties.setProperty(
277         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
278         chatBackendProperties.getKafka().getBootstrapServers());
279     properties.setProperty(
280         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
281         "false");
282     properties.setProperty(
283         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
284         "earliest");
285     return properties;
286   }
287
288   @Bean
289   ShardingPublisherStrategy shardingPublisherStrategy(
290       ChatBackendProperties properties)
291   {
292     String[] parts = properties.getHaproxyRuntimeApi().split(":");
293     InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
294     return new HaproxyShardingPublisherStrategy(
295         haproxyAddress,
296         properties.getHaproxyMap(),
297         properties.getInstanceId());
298   }
299
300   @Bean
301   ZoneId zoneId()
302   {
303     return ZoneId.systemDefault();
304   }
305 }