fix: Errors during shard-publishing should not kill the instance
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
6 import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
9 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
10 import org.apache.kafka.clients.consumer.Consumer;
11 import org.apache.kafka.clients.consumer.ConsumerConfig;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.producer.KafkaProducer;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.apache.kafka.clients.producer.ProducerConfig;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 import org.apache.kafka.common.serialization.StringSerializer;
19 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
20 import org.springframework.context.annotation.Bean;
21 import org.springframework.context.annotation.Configuration;
22 import org.springframework.kafka.support.serializer.JsonDeserializer;
23 import org.springframework.kafka.support.serializer.JsonSerializer;
24 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
25
26 import java.net.InetSocketAddress;
27 import java.time.Clock;
28 import java.time.ZoneId;
29 import java.util.HashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Properties;
33
34
35 @ConditionalOnProperty(
36     prefix = "chat.backend",
37     name = "services",
38     havingValue = "kafka")
39 @Configuration
40 public class KafkaServicesConfiguration
41 {
42   @Bean
43   ConsumerTaskRunner consumerTaskRunner(
44       ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
45       ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
46       InfoChannel infoChannel)
47   {
48     return new ConsumerTaskRunner(
49         infoChannelConsumerTaskExecutor,
50         dataChannelConsumerTaskExecutor,
51         infoChannel);
52   }
53
54   @Bean
55   ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
56       ThreadPoolTaskExecutor taskExecutor,
57       InfoChannel infoChannel,
58       Consumer<String, AbstractMessageTo> infoChannelConsumer,
59       WorkAssignor infoChannelWorkAssignor)
60   {
61     return new ConsumerTaskExecutor(
62         taskExecutor,
63         infoChannel,
64         infoChannelConsumer,
65         infoChannelWorkAssignor);
66   }
67
68   @Bean
69   WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
70   {
71     return consumer ->
72     {
73       String topic = properties.getKafka().getInfoChannelTopic();
74       List<TopicPartition> partitions = consumer
75           .partitionsFor(topic)
76           .stream()
77           .map(partitionInfo ->
78               new TopicPartition(topic, partitionInfo.partition()))
79           .toList();
80       consumer.assign(partitions);
81     };
82   }
83
84   @Bean
85   ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
86       ThreadPoolTaskExecutor taskExecutor,
87       DataChannel dataChannel,
88       Consumer<String, AbstractMessageTo> dataChannelConsumer,
89       WorkAssignor dataChannelWorkAssignor)
90   {
91     return new ConsumerTaskExecutor(
92         taskExecutor,
93         dataChannel,
94         dataChannelConsumer,
95         dataChannelWorkAssignor);
96   }
97
98   @Bean
99   WorkAssignor dataChannelWorkAssignor(
100       ChatBackendProperties properties,
101       DataChannel dataChannel)
102   {
103     return consumer ->
104     {
105       List<String> topics =
106           List.of(properties.getKafka().getDataChannelTopic());
107       consumer.subscribe(topics, dataChannel);
108     };
109   }
110
111   @Bean
112   KafkaChatHomeService kafkaChatHome(
113       ChatBackendProperties properties,
114       InfoChannel infoChannel,
115       DataChannel dataChannel)
116   {
117     return new KafkaChatHomeService(
118         properties.getKafka().getNumPartitions(),
119         infoChannel,
120         dataChannel);
121   }
122
123   @Bean
124   InfoChannel infoChannel(
125       ChatBackendProperties properties,
126       Producer<String, AbstractMessageTo> producer,
127       Consumer<String, AbstractMessageTo> infoChannelConsumer)
128   {
129     return new InfoChannel(
130         properties.getKafka().getInfoChannelTopic(),
131         producer,
132         infoChannelConsumer,
133         properties.getKafka().getPollingInterval(),
134         properties.getKafka().getNumPartitions(),
135         properties.getKafka().getInstanceUri());
136   }
137
138   @Bean
139   DataChannel dataChannel(
140       ChatBackendProperties properties,
141       Producer<String, AbstractMessageTo> producer,
142       Consumer<String, AbstractMessageTo> dataChannelConsumer,
143       ZoneId zoneId,
144       Clock clock,
145       InfoChannel infoChannel,
146       ShardingPublisherStrategy shardingPublisherStrategy)
147   {
148     return new DataChannel(
149         properties.getInstanceId(),
150         properties.getKafka().getDataChannelTopic(),
151         producer,
152         dataChannelConsumer,
153         zoneId,
154         properties.getKafka().getNumPartitions(),
155         properties.getKafka().getPollingInterval(),
156         properties.getChatroomBufferSize(),
157         clock,
158         infoChannel,
159         shardingPublisherStrategy);
160   }
161
162   @Bean
163   Producer<String, AbstractMessageTo>  producer(
164       Properties defaultProducerProperties,
165       ChatBackendProperties chatBackendProperties,
166       StringSerializer stringSerializer,
167       JsonSerializer<AbstractMessageTo> messageSerializer)
168   {
169     Map<String, Object> properties = new HashMap<>();
170     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
171     properties.put(
172         ProducerConfig.CLIENT_ID_CONFIG,
173         chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
174     return new KafkaProducer<>(
175         properties,
176         stringSerializer,
177         messageSerializer);
178   }
179
180   @Bean
181   StringSerializer stringSerializer()
182   {
183     return new StringSerializer();
184   }
185
186   @Bean
187   JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
188   {
189     JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
190     serializer.configure(
191         Map.of(
192             JsonSerializer.TYPE_MAPPINGS, typeMappings),
193         false);
194     return serializer;
195   }
196
197   @Bean
198   Consumer<String, AbstractMessageTo>  infoChannelConsumer(
199       Properties defaultConsumerProperties,
200       ChatBackendProperties chatBackendProperties,
201       StringDeserializer stringDeserializer,
202       JsonDeserializer<AbstractMessageTo> messageDeserializer)
203   {
204     Map<String, Object> properties = new HashMap<>();
205     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
206     properties.put(
207         ConsumerConfig.CLIENT_ID_CONFIG,
208         chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
209     properties.put(
210         ConsumerConfig.GROUP_ID_CONFIG,
211         "info_channel");
212     return new KafkaConsumer<>(
213         properties,
214         stringDeserializer,
215         messageDeserializer);
216   }
217
218   @Bean
219   Consumer<String, AbstractMessageTo>  dataChannelConsumer(
220       Properties defaultConsumerProperties,
221       ChatBackendProperties chatBackendProperties,
222       StringDeserializer stringDeserializer,
223       JsonDeserializer<AbstractMessageTo> messageDeserializer)
224   {
225     Map<String, Object> properties = new HashMap<>();
226     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
227     properties.put(
228         ConsumerConfig.CLIENT_ID_CONFIG,
229         chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
230     properties.put(
231         ConsumerConfig.GROUP_ID_CONFIG,
232         "data_channel");
233     return new KafkaConsumer<>(
234         properties,
235         stringDeserializer,
236         messageDeserializer);
237   }
238
239   @Bean
240   StringDeserializer stringDeserializer()
241   {
242     return new StringDeserializer();
243   }
244
245   @Bean
246   JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
247   {
248     JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
249     deserializer.configure(
250         Map.of(
251             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
252             JsonDeserializer.TYPE_MAPPINGS, typeMappings),
253         false );
254     return deserializer;
255   }
256
257   @Bean
258   String typeMappings ()
259   {
260     return
261         "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
262         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
263   }
264
265   @Bean
266   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
267   {
268     Properties properties = new Properties();
269     properties.setProperty(
270         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
271         chatBackendProperties.getKafka().getBootstrapServers());
272     return properties;
273   }
274
275   @Bean
276   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
277   {
278     Properties properties = new Properties();
279     properties.setProperty(
280         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
281         chatBackendProperties.getKafka().getBootstrapServers());
282     properties.setProperty(
283         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
284         "false");
285     properties.setProperty(
286         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
287         "earliest");
288     return properties;
289   }
290
291   @Bean
292   ShardingPublisherStrategy shardingPublisherStrategy(
293       ChatBackendProperties properties)
294   {
295     String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
296     InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
297     return new HaproxyShardingPublisherStrategy(
298         haproxyAddress,
299         properties.getKafka().getHaproxyMap(),
300         properties.getInstanceId());
301   }
302
303   @Bean
304   ZoneId zoneId()
305   {
306     return ZoneId.systemDefault();
307   }
308 }