fix: The number of chards is the number of partitions of the `data_channel`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
6 import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
9 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
10 import org.apache.kafka.clients.consumer.Consumer;
11 import org.apache.kafka.clients.consumer.ConsumerConfig;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.producer.KafkaProducer;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.apache.kafka.clients.producer.ProducerConfig;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 import org.apache.kafka.common.serialization.StringSerializer;
19 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
20 import org.springframework.context.annotation.Bean;
21 import org.springframework.context.annotation.Configuration;
22 import org.springframework.kafka.support.serializer.JsonDeserializer;
23 import org.springframework.kafka.support.serializer.JsonSerializer;
24 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
25
26 import java.net.InetSocketAddress;
27 import java.time.Clock;
28 import java.time.ZoneId;
29 import java.util.HashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Properties;
33
34
35 @ConditionalOnProperty(
36     prefix = "chat.backend",
37     name = "services",
38     havingValue = "kafka")
39 @Configuration
40 public class KafkaServicesConfiguration
41 {
42   @Bean
43   ConsumerTaskRunner consumerTaskRunner(
44       ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
45       ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
46       InfoChannel infoChannel)
47   {
48     return new ConsumerTaskRunner(
49         infoChannelConsumerTaskExecutor,
50         dataChannelConsumerTaskExecutor,
51         infoChannel);
52   }
53
54   @Bean
55   ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
56       ThreadPoolTaskExecutor taskExecutor,
57       InfoChannel infoChannel,
58       Consumer<String, AbstractMessageTo> infoChannelConsumer,
59       WorkAssignor infoChannelWorkAssignor)
60   {
61     return new ConsumerTaskExecutor(
62         taskExecutor,
63         infoChannel,
64         infoChannelConsumer,
65         infoChannelWorkAssignor);
66   }
67
68   @Bean
69   WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
70   {
71     return consumer ->
72     {
73       String topic = properties.getKafka().getInfoChannelTopic();
74       List<TopicPartition> partitions = consumer
75           .partitionsFor(topic)
76           .stream()
77           .map(partitionInfo ->
78               new TopicPartition(topic, partitionInfo.partition()))
79           .toList();
80       consumer.assign(partitions);
81     };
82   }
83
84   @Bean
85   ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
86       ThreadPoolTaskExecutor taskExecutor,
87       DataChannel dataChannel,
88       Consumer<String, AbstractMessageTo> dataChannelConsumer,
89       WorkAssignor dataChannelWorkAssignor)
90   {
91     return new ConsumerTaskExecutor(
92         taskExecutor,
93         dataChannel,
94         dataChannelConsumer,
95         dataChannelWorkAssignor);
96   }
97
98   @Bean
99   WorkAssignor dataChannelWorkAssignor(
100       ChatBackendProperties properties,
101       DataChannel dataChannel)
102   {
103     return consumer ->
104     {
105       List<String> topics =
106           List.of(properties.getKafka().getDataChannelTopic());
107       consumer.subscribe(topics, dataChannel);
108     };
109   }
110
111   @Bean
112   KafkaChatHomeService kafkaChatHome(
113       ChatBackendProperties properties,
114       InfoChannel infoChannel,
115       DataChannel dataChannel)
116   {
117     return new KafkaChatHomeService(
118         properties.getKafka().getNumPartitions(),
119         infoChannel,
120         dataChannel);
121   }
122
123   @Bean
124   InfoChannel infoChannel(
125       ChatBackendProperties properties,
126       Producer<String, AbstractMessageTo> producer,
127       Consumer<String, AbstractMessageTo> infoChannelConsumer)
128   {
129     return new InfoChannel(
130         properties.getKafka().getInfoChannelTopic(),
131         producer,
132         infoChannelConsumer,
133         properties.getKafka().getNumPartitions(),
134         properties.getKafka().getInstanceUri());
135   }
136
137   @Bean
138   DataChannel dataChannel(
139       ChatBackendProperties properties,
140       Producer<String, AbstractMessageTo> producer,
141       Consumer<String, AbstractMessageTo> dataChannelConsumer,
142       ZoneId zoneId,
143       Clock clock,
144       InfoChannel infoChannel,
145       ShardingPublisherStrategy shardingPublisherStrategy)
146   {
147     return new DataChannel(
148         properties.getInstanceId(),
149         properties.getKafka().getDataChannelTopic(),
150         producer,
151         dataChannelConsumer,
152         zoneId,
153         properties.getKafka().getNumPartitions(),
154         properties.getChatroomBufferSize(),
155         clock,
156         infoChannel,
157         shardingPublisherStrategy);
158   }
159
160   @Bean
161   Producer<String, AbstractMessageTo>  producer(
162       Properties defaultProducerProperties,
163       ChatBackendProperties chatBackendProperties,
164       StringSerializer stringSerializer,
165       JsonSerializer<AbstractMessageTo> messageSerializer)
166   {
167     Map<String, Object> properties = new HashMap<>();
168     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
169     properties.put(
170         ProducerConfig.CLIENT_ID_CONFIG,
171         chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
172     return new KafkaProducer<>(
173         properties,
174         stringSerializer,
175         messageSerializer);
176   }
177
178   @Bean
179   StringSerializer stringSerializer()
180   {
181     return new StringSerializer();
182   }
183
184   @Bean
185   JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
186   {
187     JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
188     serializer.configure(
189         Map.of(
190             JsonSerializer.TYPE_MAPPINGS, typeMappings),
191         false);
192     return serializer;
193   }
194
195   @Bean
196   Consumer<String, AbstractMessageTo>  infoChannelConsumer(
197       Properties defaultConsumerProperties,
198       ChatBackendProperties chatBackendProperties,
199       StringDeserializer stringDeserializer,
200       JsonDeserializer<AbstractMessageTo> messageDeserializer)
201   {
202     Map<String, Object> properties = new HashMap<>();
203     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
204     properties.put(
205         ConsumerConfig.CLIENT_ID_CONFIG,
206         chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
207     properties.put(
208         ConsumerConfig.GROUP_ID_CONFIG,
209         "info_channel");
210     return new KafkaConsumer<>(
211         properties,
212         stringDeserializer,
213         messageDeserializer);
214   }
215
216   @Bean
217   Consumer<String, AbstractMessageTo>  dataChannelConsumer(
218       Properties defaultConsumerProperties,
219       ChatBackendProperties chatBackendProperties,
220       StringDeserializer stringDeserializer,
221       JsonDeserializer<AbstractMessageTo> messageDeserializer)
222   {
223     Map<String, Object> properties = new HashMap<>();
224     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
225     properties.put(
226         ConsumerConfig.CLIENT_ID_CONFIG,
227         chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
228     properties.put(
229         ConsumerConfig.GROUP_ID_CONFIG,
230         "data_channel");
231     return new KafkaConsumer<>(
232         properties,
233         stringDeserializer,
234         messageDeserializer);
235   }
236
237   @Bean
238   StringDeserializer stringDeserializer()
239   {
240     return new StringDeserializer();
241   }
242
243   @Bean
244   JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
245   {
246     JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
247     deserializer.configure(
248         Map.of(
249             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
250             JsonDeserializer.TYPE_MAPPINGS, typeMappings),
251         false );
252     return deserializer;
253   }
254
255   @Bean
256   String typeMappings ()
257   {
258     return
259         "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
260         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
261   }
262
263   @Bean
264   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
265   {
266     Properties properties = new Properties();
267     properties.setProperty(
268         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
269         chatBackendProperties.getKafka().getBootstrapServers());
270     return properties;
271   }
272
273   @Bean
274   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
275   {
276     Properties properties = new Properties();
277     properties.setProperty(
278         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
279         chatBackendProperties.getKafka().getBootstrapServers());
280     properties.setProperty(
281         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
282         "false");
283     properties.setProperty(
284         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
285         "earliest");
286     return properties;
287   }
288
289   @Bean
290   ShardingPublisherStrategy shardingPublisherStrategy(
291       ChatBackendProperties properties)
292   {
293     String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
294     InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
295     return new HaproxyShardingPublisherStrategy(
296         haproxyAddress,
297         properties.getKafka().getHaproxyMap(),
298         properties.getInstanceId());
299   }
300
301   @Bean
302   ZoneId zoneId()
303   {
304     return ZoneId.systemDefault();
305   }
306 }