WIP: shard assigned/revoked events
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
22 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
23
24 import java.time.Clock;
25 import java.time.ZoneId;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30
31
32 @ConditionalOnProperty(
33     prefix = "chat.backend",
34     name = "services",
35     havingValue = "kafka")
36 @Configuration
37 public class KafkaServicesConfiguration
38 {
39   @Bean
40   ConsumerTaskRunner consumerTaskRunner(
41       ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
42       ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
43   {
44     return new ConsumerTaskRunner(
45         infoChannelConsumerTaskExecutor,
46         dataChannelConsumerTaskExecutor);
47   }
48
49   @Bean
50   ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
51       ThreadPoolTaskExecutor taskExecutor,
52       InfoChannel infoChannel,
53       Consumer<String, AbstractMessageTo> infoChannelConsumer,
54       WorkAssignor infoChannelWorkAssignor)
55   {
56     return new ConsumerTaskExecutor(
57         taskExecutor,
58         infoChannel,
59         infoChannelConsumer,
60         infoChannelWorkAssignor);
61   }
62
63   @Bean
64   WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
65   {
66     return consumer ->
67     {
68       String topic = properties.getKafka().getInfoChannelTopic();
69       List<TopicPartition> partitions = consumer
70           .partitionsFor(topic)
71           .stream()
72           .map(partitionInfo ->
73               new TopicPartition(topic, partitionInfo.partition()))
74           .toList();
75       consumer.assign(partitions);
76     };
77   }
78
79   @Bean
80   ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
81       ThreadPoolTaskExecutor taskExecutor,
82       DataChannel dataChannel,
83       Consumer<String, AbstractMessageTo> dataChannelConsumer,
84       WorkAssignor dataChannelWorkAssignor)
85   {
86     return new ConsumerTaskExecutor(
87         taskExecutor,
88         dataChannel,
89         dataChannelConsumer,
90         dataChannelWorkAssignor);
91   }
92
93   @Bean
94   WorkAssignor dataChannelWorkAssignor(
95       ChatBackendProperties properties,
96       DataChannel dataChannel)
97   {
98     return consumer ->
99     {
100       List<String> topics =
101           List.of(properties.getKafka().getDataChannelTopic());
102       consumer.subscribe(topics, dataChannel);
103     };
104   }
105
106   @Bean
107     ChatHomeService kafkaChatHome(
108       ChatBackendProperties properties,
109       InfoChannel infoChannel,
110       DataChannel dataChannel)
111   {
112     return new KafkaChatHomeService(
113         properties.getKafka().getNumPartitions(),
114         infoChannel,
115         dataChannel);
116   }
117
118   @Bean
119   InfoChannel infoChannel(
120       ChatBackendProperties properties,
121       Producer<String, AbstractMessageTo> producer,
122       Consumer<String, AbstractMessageTo> infoChannelConsumer)
123   {
124     return new InfoChannel(
125         properties.getKafka().getInfoChannelTopic(),
126         producer,
127         infoChannelConsumer,
128         properties.getKafka().getInstanceUri());
129   }
130
131   @Bean
132   DataChannel dataChannel(
133       ChatBackendProperties properties,
134       Producer<String, AbstractMessageTo> producer,
135       Consumer<String, AbstractMessageTo> dataChannelConsumer,
136       ZoneId zoneId,
137       Clock clock,
138       InfoChannel infoChannel)
139   {
140     return new DataChannel(
141         properties.getKafka().getDataChannelTopic(),
142         producer,
143         dataChannelConsumer,
144         zoneId,
145         properties.getKafka().getNumPartitions(),
146         properties.getChatroomBufferSize(),
147         clock,
148         infoChannel);
149   }
150
151   @Bean
152   Producer<String, AbstractMessageTo>  producer(
153       Properties defaultProducerProperties,
154       ChatBackendProperties chatBackendProperties,
155       StringSerializer stringSerializer,
156       JsonSerializer<AbstractMessageTo> messageSerializer)
157   {
158     Map<String, Object> properties = new HashMap<>();
159     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
160     properties.put(
161         ProducerConfig.CLIENT_ID_CONFIG,
162         chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
163     return new KafkaProducer<>(
164         properties,
165         stringSerializer,
166         messageSerializer);
167   }
168
169   @Bean
170   StringSerializer stringSerializer()
171   {
172     return new StringSerializer();
173   }
174
175   @Bean
176   JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
177   {
178     JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
179     serializer.configure(
180         Map.of(
181             JsonSerializer.TYPE_MAPPINGS, typeMappings),
182         false);
183     return serializer;
184   }
185
186   @Bean
187   Consumer<String, AbstractMessageTo>  infoChannelConsumer(
188       Properties defaultConsumerProperties,
189       ChatBackendProperties chatBackendProperties,
190       StringDeserializer stringDeserializer,
191       JsonDeserializer<AbstractMessageTo> messageDeserializer)
192   {
193     Map<String, Object> properties = new HashMap<>();
194     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
195     properties.put(
196         ConsumerConfig.CLIENT_ID_CONFIG,
197         chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
198     properties.put(
199         ConsumerConfig.GROUP_ID_CONFIG,
200         "info_channel");
201     return new KafkaConsumer<>(
202         properties,
203         stringDeserializer,
204         messageDeserializer);
205   }
206
207   @Bean
208   Consumer<String, AbstractMessageTo>  dataChannelConsumer(
209       Properties defaultConsumerProperties,
210       ChatBackendProperties chatBackendProperties,
211       StringDeserializer stringDeserializer,
212       JsonDeserializer<AbstractMessageTo> messageDeserializer)
213   {
214     Map<String, Object> properties = new HashMap<>();
215     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
216     properties.put(
217         ConsumerConfig.CLIENT_ID_CONFIG,
218         chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
219     properties.put(
220         ConsumerConfig.GROUP_ID_CONFIG,
221         "data_channel");
222     return new KafkaConsumer<>(
223         properties,
224         stringDeserializer,
225         messageDeserializer);
226   }
227
228   @Bean
229   StringDeserializer stringDeserializer()
230   {
231     return new StringDeserializer();
232   }
233
234   @Bean
235   JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
236   {
237     JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
238     deserializer.configure(
239         Map.of(
240             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
241             JsonDeserializer.TYPE_MAPPINGS, typeMappings),
242         false );
243     return deserializer;
244   }
245
246   @Bean
247   String typeMappings ()
248   {
249     return
250         "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
251         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
252   }
253
254   @Bean
255   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
256   {
257     Properties properties = new Properties();
258     properties.setProperty(
259         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
260         chatBackendProperties.getKafka().getBootstrapServers());
261     return properties;
262   }
263
264   @Bean
265   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
266   {
267     Properties properties = new Properties();
268     properties.setProperty(
269         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
270         chatBackendProperties.getKafka().getBootstrapServers());
271     properties.setProperty(
272         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
273         "false");
274     properties.setProperty(
275         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
276         "earliest");
277     return properties;
278   }
279
280   @Bean
281   ZoneId zoneId()
282   {
283     return ZoneId.systemDefault();
284   }
285 }