refactor: Extracted interface `WorkAssignor` into separate file
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
22 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
23
24 import java.time.Clock;
25 import java.time.ZoneId;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30
31
32 @ConditionalOnProperty(
33     prefix = "chat.backend",
34     name = "services",
35     havingValue = "kafka")
36 @Configuration
37 public class KafkaServicesConfiguration
38 {
39   @Bean
40   ConsumerTaskRunner consumerTaskRunner(
41       ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
42       ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
43   {
44     return new ConsumerTaskRunner(
45         infoChannelConsumerTaskExecutor,
46         dataChannelConsumerTaskExecutor);
47   }
48
49   @Bean
50   ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
51       ThreadPoolTaskExecutor taskExecutor,
52       InfoChannel infoChannel,
53       Consumer<String, AbstractMessageTo> infoChannelConsumer,
54       WorkAssignor infoChannelWorkAssignor)
55   {
56     return new ConsumerTaskExecutor(
57         taskExecutor,
58         infoChannel,
59         infoChannelConsumer,
60         infoChannelWorkAssignor);
61   }
62
63   @Bean
64   WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
65   {
66     return consumer ->
67     {
68       String topic = properties.getKafka().getInfoChannelTopic();
69       List<TopicPartition> partitions = consumer
70           .partitionsFor(topic)
71           .stream()
72           .map(partitionInfo ->
73               new TopicPartition(topic, partitionInfo.partition()))
74           .toList();
75       consumer.assign(partitions);
76     };
77   }
78
79   @Bean
80   ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
81       ThreadPoolTaskExecutor taskExecutor,
82       DataChannel dataChannel,
83       Consumer<String, AbstractMessageTo> dataChannelConsumer,
84       WorkAssignor dataChannelWorkAssignor)
85   {
86     return new ConsumerTaskExecutor(
87         taskExecutor,
88         dataChannel,
89         dataChannelConsumer,
90         dataChannelWorkAssignor);
91   }
92
93   @Bean
94   WorkAssignor dataChannelWorkAssignor(
95       ChatBackendProperties properties,
96       DataChannel dataChannel)
97   {
98     return consumer ->
99     {
100       List<String> topics =
101           List.of(properties.getKafka().getDataChannelTopic());
102       consumer.subscribe(topics, dataChannel);
103     };
104   }
105
106   @Bean
107     ChatHomeService kafkaChatHome(
108       ChatBackendProperties properties,
109       InfoChannel infoChannel,
110       DataChannel dataChannel)
111   {
112     return new KafkaChatHomeService(
113         properties.getKafka().getNumPartitions(),
114         infoChannel,
115         dataChannel);
116   }
117
118   @Bean
119   InfoChannel infoChannel(
120       ChatBackendProperties properties,
121       Producer<String, AbstractMessageTo> producer,
122       Consumer<String, AbstractMessageTo> infoChannelConsumer)
123   {
124     return new InfoChannel(
125         properties.getKafka().getInfoChannelTopic(),
126         producer,
127         infoChannelConsumer);
128   }
129
130   @Bean
131   DataChannel dataChannel(
132       ChatBackendProperties properties,
133       Producer<String, AbstractMessageTo> producer,
134       Consumer<String, AbstractMessageTo> dataChannelConsumer,
135       ZoneId zoneId,
136       Clock clock)
137   {
138     return new DataChannel(
139         properties.getKafka().getDataChannelTopic(),
140         producer,
141         dataChannelConsumer,
142         zoneId,
143         properties.getKafka().getNumPartitions(),
144         properties.getChatroomBufferSize(),
145         clock);
146   }
147
148   @Bean
149   Producer<String, AbstractMessageTo>  producer(
150       Properties defaultProducerProperties,
151       ChatBackendProperties chatBackendProperties,
152       StringSerializer stringSerializer,
153       JsonSerializer<AbstractMessageTo> messageSerializer)
154   {
155     Map<String, Object> properties = new HashMap<>();
156     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
157     properties.put(
158         ProducerConfig.CLIENT_ID_CONFIG,
159         chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
160     return new KafkaProducer<>(
161         properties,
162         stringSerializer,
163         messageSerializer);
164   }
165
166   @Bean
167   StringSerializer stringSerializer()
168   {
169     return new StringSerializer();
170   }
171
172   @Bean
173   JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
174   {
175     JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
176     serializer.configure(
177         Map.of(
178             JsonSerializer.TYPE_MAPPINGS, typeMappings),
179         false);
180     return serializer;
181   }
182
183   @Bean
184   Consumer<String, AbstractMessageTo>  infoChannelConsumer(
185       Properties defaultConsumerProperties,
186       ChatBackendProperties chatBackendProperties,
187       StringDeserializer stringDeserializer,
188       JsonDeserializer<AbstractMessageTo> messageDeserializer)
189   {
190     Map<String, Object> properties = new HashMap<>();
191     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
192     properties.put(
193         ConsumerConfig.CLIENT_ID_CONFIG,
194         chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
195     properties.put(
196         ConsumerConfig.GROUP_ID_CONFIG,
197         "info_channel");
198     return new KafkaConsumer<>(
199         properties,
200         stringDeserializer,
201         messageDeserializer);
202   }
203
204   @Bean
205   Consumer<String, AbstractMessageTo>  dataChannelConsumer(
206       Properties defaultConsumerProperties,
207       ChatBackendProperties chatBackendProperties,
208       StringDeserializer stringDeserializer,
209       JsonDeserializer<AbstractMessageTo> messageDeserializer)
210   {
211     Map<String, Object> properties = new HashMap<>();
212     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
213     properties.put(
214         ConsumerConfig.CLIENT_ID_CONFIG,
215         chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
216     properties.put(
217         ConsumerConfig.GROUP_ID_CONFIG,
218         "data_channel");
219     return new KafkaConsumer<>(
220         properties,
221         stringDeserializer,
222         messageDeserializer);
223   }
224
225   @Bean
226   StringDeserializer stringDeserializer()
227   {
228     return new StringDeserializer();
229   }
230
231   @Bean
232   JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
233   {
234     JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
235     deserializer.configure(
236         Map.of(
237             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
238             JsonDeserializer.TYPE_MAPPINGS, typeMappings),
239         false );
240     return deserializer;
241   }
242
243   @Bean
244   String typeMappings ()
245   {
246     return
247         "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
248         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
249   }
250
251   @Bean
252   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
253   {
254     Properties properties = new Properties();
255     properties.setProperty(
256         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
257         chatBackendProperties.getKafka().getBootstrapServers());
258     return properties;
259   }
260
261   @Bean
262   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
263   {
264     Properties properties = new Properties();
265     properties.setProperty(
266         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
267         chatBackendProperties.getKafka().getBootstrapServers());
268     properties.setProperty(
269         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
270         "false");
271     properties.setProperty(
272         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
273         "earliest");
274     return properties;
275   }
276
277   @Bean
278   ZoneId zoneId()
279   {
280     return ZoneId.systemDefault();
281   }
282 }