feat: Introduced a configurable instance-id
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
22 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
23
24 import java.time.Clock;
25 import java.time.ZoneId;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30
31
32 @ConditionalOnProperty(
33     prefix = "chat.backend",
34     name = "services",
35     havingValue = "kafka")
36 @Configuration
37 public class KafkaServicesConfiguration
38 {
39   @Bean
40   ConsumerTaskRunner consumerTaskRunner(
41       ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
42       ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
43       InfoChannel infoChannel)
44   {
45     return new ConsumerTaskRunner(
46         infoChannelConsumerTaskExecutor,
47         dataChannelConsumerTaskExecutor,
48         infoChannel);
49   }
50
51   @Bean
52   ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
53       ThreadPoolTaskExecutor taskExecutor,
54       InfoChannel infoChannel,
55       Consumer<String, AbstractMessageTo> infoChannelConsumer,
56       WorkAssignor infoChannelWorkAssignor)
57   {
58     return new ConsumerTaskExecutor(
59         taskExecutor,
60         infoChannel,
61         infoChannelConsumer,
62         infoChannelWorkAssignor);
63   }
64
65   @Bean
66   WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
67   {
68     return consumer ->
69     {
70       String topic = properties.getKafka().getInfoChannelTopic();
71       List<TopicPartition> partitions = consumer
72           .partitionsFor(topic)
73           .stream()
74           .map(partitionInfo ->
75               new TopicPartition(topic, partitionInfo.partition()))
76           .toList();
77       consumer.assign(partitions);
78     };
79   }
80
81   @Bean
82   ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
83       ThreadPoolTaskExecutor taskExecutor,
84       DataChannel dataChannel,
85       Consumer<String, AbstractMessageTo> dataChannelConsumer,
86       WorkAssignor dataChannelWorkAssignor)
87   {
88     return new ConsumerTaskExecutor(
89         taskExecutor,
90         dataChannel,
91         dataChannelConsumer,
92         dataChannelWorkAssignor);
93   }
94
95   @Bean
96   WorkAssignor dataChannelWorkAssignor(
97       ChatBackendProperties properties,
98       DataChannel dataChannel)
99   {
100     return consumer ->
101     {
102       List<String> topics =
103           List.of(properties.getKafka().getDataChannelTopic());
104       consumer.subscribe(topics, dataChannel);
105     };
106   }
107
108   @Bean
109     ChatHomeService kafkaChatHome(
110       ChatBackendProperties properties,
111       InfoChannel infoChannel,
112       DataChannel dataChannel)
113   {
114     return new KafkaChatHomeService(
115         properties.getKafka().getNumPartitions(),
116         infoChannel,
117         dataChannel);
118   }
119
120   @Bean
121   InfoChannel infoChannel(
122       ChatBackendProperties properties,
123       Producer<String, AbstractMessageTo> producer,
124       Consumer<String, AbstractMessageTo> infoChannelConsumer)
125   {
126     return new InfoChannel(
127         properties.getKafka().getInfoChannelTopic(),
128         producer,
129         infoChannelConsumer,
130         properties.getKafka().getInstanceUri());
131   }
132
133   @Bean
134   DataChannel dataChannel(
135       ChatBackendProperties properties,
136       Producer<String, AbstractMessageTo> producer,
137       Consumer<String, AbstractMessageTo> dataChannelConsumer,
138       ZoneId zoneId,
139       Clock clock,
140       InfoChannel infoChannel)
141   {
142     return new DataChannel(
143         properties.getInstanceId(),
144         properties.getKafka().getDataChannelTopic(),
145         producer,
146         dataChannelConsumer,
147         zoneId,
148         properties.getKafka().getNumPartitions(),
149         properties.getChatroomBufferSize(),
150         clock,
151         infoChannel);
152   }
153
154   @Bean
155   Producer<String, AbstractMessageTo>  producer(
156       Properties defaultProducerProperties,
157       ChatBackendProperties chatBackendProperties,
158       StringSerializer stringSerializer,
159       JsonSerializer<AbstractMessageTo> messageSerializer)
160   {
161     Map<String, Object> properties = new HashMap<>();
162     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
163     properties.put(
164         ProducerConfig.CLIENT_ID_CONFIG,
165         chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
166     return new KafkaProducer<>(
167         properties,
168         stringSerializer,
169         messageSerializer);
170   }
171
172   @Bean
173   StringSerializer stringSerializer()
174   {
175     return new StringSerializer();
176   }
177
178   @Bean
179   JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
180   {
181     JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
182     serializer.configure(
183         Map.of(
184             JsonSerializer.TYPE_MAPPINGS, typeMappings),
185         false);
186     return serializer;
187   }
188
189   @Bean
190   Consumer<String, AbstractMessageTo>  infoChannelConsumer(
191       Properties defaultConsumerProperties,
192       ChatBackendProperties chatBackendProperties,
193       StringDeserializer stringDeserializer,
194       JsonDeserializer<AbstractMessageTo> messageDeserializer)
195   {
196     Map<String, Object> properties = new HashMap<>();
197     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
198     properties.put(
199         ConsumerConfig.CLIENT_ID_CONFIG,
200         chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
201     properties.put(
202         ConsumerConfig.GROUP_ID_CONFIG,
203         "info_channel");
204     return new KafkaConsumer<>(
205         properties,
206         stringDeserializer,
207         messageDeserializer);
208   }
209
210   @Bean
211   Consumer<String, AbstractMessageTo>  dataChannelConsumer(
212       Properties defaultConsumerProperties,
213       ChatBackendProperties chatBackendProperties,
214       StringDeserializer stringDeserializer,
215       JsonDeserializer<AbstractMessageTo> messageDeserializer)
216   {
217     Map<String, Object> properties = new HashMap<>();
218     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
219     properties.put(
220         ConsumerConfig.CLIENT_ID_CONFIG,
221         chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
222     properties.put(
223         ConsumerConfig.GROUP_ID_CONFIG,
224         "data_channel");
225     return new KafkaConsumer<>(
226         properties,
227         stringDeserializer,
228         messageDeserializer);
229   }
230
231   @Bean
232   StringDeserializer stringDeserializer()
233   {
234     return new StringDeserializer();
235   }
236
237   @Bean
238   JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
239   {
240     JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
241     deserializer.configure(
242         Map.of(
243             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
244             JsonDeserializer.TYPE_MAPPINGS, typeMappings),
245         false );
246     return deserializer;
247   }
248
249   @Bean
250   String typeMappings ()
251   {
252     return
253         "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
254         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
255   }
256
257   @Bean
258   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
259   {
260     Properties properties = new Properties();
261     properties.setProperty(
262         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
263         chatBackendProperties.getKafka().getBootstrapServers());
264     return properties;
265   }
266
267   @Bean
268   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
269   {
270     Properties properties = new Properties();
271     properties.setProperty(
272         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
273         chatBackendProperties.getKafka().getBootstrapServers());
274     properties.setProperty(
275         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
276         "false");
277     properties.setProperty(
278         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
279         "earliest");
280     return properties;
281   }
282
283   @Bean
284   ZoneId zoneId()
285   {
286     return ZoneId.systemDefault();
287   }
288 }