feat: Introduced interface `ShardingPublisherStrategy`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerConfig;
11 import org.apache.kafka.clients.consumer.KafkaConsumer;
12 import org.apache.kafka.clients.producer.KafkaProducer;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerConfig;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.serialization.StringDeserializer;
17 import org.apache.kafka.common.serialization.StringSerializer;
18 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
19 import org.springframework.context.annotation.Bean;
20 import org.springframework.context.annotation.Configuration;
21 import org.springframework.kafka.support.serializer.JsonDeserializer;
22 import org.springframework.kafka.support.serializer.JsonSerializer;
23 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
24 import reactor.core.publisher.Mono;
25
26 import java.time.Clock;
27 import java.time.ZoneId;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Properties;
32
33
34 @ConditionalOnProperty(
35     prefix = "chat.backend",
36     name = "services",
37     havingValue = "kafka")
38 @Configuration
39 public class KafkaServicesConfiguration
40 {
41   @Bean
42   ConsumerTaskRunner consumerTaskRunner(
43       ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
44       ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
45       InfoChannel infoChannel)
46   {
47     return new ConsumerTaskRunner(
48         infoChannelConsumerTaskExecutor,
49         dataChannelConsumerTaskExecutor,
50         infoChannel);
51   }
52
53   @Bean
54   ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
55       ThreadPoolTaskExecutor taskExecutor,
56       InfoChannel infoChannel,
57       Consumer<String, AbstractMessageTo> infoChannelConsumer,
58       WorkAssignor infoChannelWorkAssignor)
59   {
60     return new ConsumerTaskExecutor(
61         taskExecutor,
62         infoChannel,
63         infoChannelConsumer,
64         infoChannelWorkAssignor);
65   }
66
67   @Bean
68   WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
69   {
70     return consumer ->
71     {
72       String topic = properties.getKafka().getInfoChannelTopic();
73       List<TopicPartition> partitions = consumer
74           .partitionsFor(topic)
75           .stream()
76           .map(partitionInfo ->
77               new TopicPartition(topic, partitionInfo.partition()))
78           .toList();
79       consumer.assign(partitions);
80     };
81   }
82
83   @Bean
84   ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
85       ThreadPoolTaskExecutor taskExecutor,
86       DataChannel dataChannel,
87       Consumer<String, AbstractMessageTo> dataChannelConsumer,
88       WorkAssignor dataChannelWorkAssignor)
89   {
90     return new ConsumerTaskExecutor(
91         taskExecutor,
92         dataChannel,
93         dataChannelConsumer,
94         dataChannelWorkAssignor);
95   }
96
97   @Bean
98   WorkAssignor dataChannelWorkAssignor(
99       ChatBackendProperties properties,
100       DataChannel dataChannel)
101   {
102     return consumer ->
103     {
104       List<String> topics =
105           List.of(properties.getKafka().getDataChannelTopic());
106       consumer.subscribe(topics, dataChannel);
107     };
108   }
109
110   @Bean
111     ChatHomeService kafkaChatHome(
112       ChatBackendProperties properties,
113       InfoChannel infoChannel,
114       DataChannel dataChannel)
115   {
116     return new KafkaChatHomeService(
117         properties.getKafka().getNumPartitions(),
118         infoChannel,
119         dataChannel);
120   }
121
122   @Bean
123   InfoChannel infoChannel(
124       ChatBackendProperties properties,
125       Producer<String, AbstractMessageTo> producer,
126       Consumer<String, AbstractMessageTo> infoChannelConsumer)
127   {
128     return new InfoChannel(
129         properties.getKafka().getInfoChannelTopic(),
130         producer,
131         infoChannelConsumer,
132         properties.getKafka().getInstanceUri());
133   }
134
135   @Bean
136   DataChannel dataChannel(
137       ChatBackendProperties properties,
138       Producer<String, AbstractMessageTo> producer,
139       Consumer<String, AbstractMessageTo> dataChannelConsumer,
140       ZoneId zoneId,
141       Clock clock,
142       InfoChannel infoChannel,
143       ShardingPublisherStrategy shardingPublisherStrategy)
144   {
145     return new DataChannel(
146         properties.getInstanceId(),
147         properties.getKafka().getDataChannelTopic(),
148         producer,
149         dataChannelConsumer,
150         zoneId,
151         properties.getKafka().getNumPartitions(),
152         properties.getChatroomBufferSize(),
153         clock,
154         infoChannel,
155         shardingPublisherStrategy);
156   }
157
158   @Bean
159   Producer<String, AbstractMessageTo>  producer(
160       Properties defaultProducerProperties,
161       ChatBackendProperties chatBackendProperties,
162       StringSerializer stringSerializer,
163       JsonSerializer<AbstractMessageTo> messageSerializer)
164   {
165     Map<String, Object> properties = new HashMap<>();
166     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
167     properties.put(
168         ProducerConfig.CLIENT_ID_CONFIG,
169         chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
170     return new KafkaProducer<>(
171         properties,
172         stringSerializer,
173         messageSerializer);
174   }
175
176   @Bean
177   StringSerializer stringSerializer()
178   {
179     return new StringSerializer();
180   }
181
182   @Bean
183   JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
184   {
185     JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
186     serializer.configure(
187         Map.of(
188             JsonSerializer.TYPE_MAPPINGS, typeMappings),
189         false);
190     return serializer;
191   }
192
193   @Bean
194   Consumer<String, AbstractMessageTo>  infoChannelConsumer(
195       Properties defaultConsumerProperties,
196       ChatBackendProperties chatBackendProperties,
197       StringDeserializer stringDeserializer,
198       JsonDeserializer<AbstractMessageTo> messageDeserializer)
199   {
200     Map<String, Object> properties = new HashMap<>();
201     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
202     properties.put(
203         ConsumerConfig.CLIENT_ID_CONFIG,
204         chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
205     properties.put(
206         ConsumerConfig.GROUP_ID_CONFIG,
207         "info_channel");
208     return new KafkaConsumer<>(
209         properties,
210         stringDeserializer,
211         messageDeserializer);
212   }
213
214   @Bean
215   Consumer<String, AbstractMessageTo>  dataChannelConsumer(
216       Properties defaultConsumerProperties,
217       ChatBackendProperties chatBackendProperties,
218       StringDeserializer stringDeserializer,
219       JsonDeserializer<AbstractMessageTo> messageDeserializer)
220   {
221     Map<String, Object> properties = new HashMap<>();
222     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
223     properties.put(
224         ConsumerConfig.CLIENT_ID_CONFIG,
225         chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
226     properties.put(
227         ConsumerConfig.GROUP_ID_CONFIG,
228         "data_channel");
229     return new KafkaConsumer<>(
230         properties,
231         stringDeserializer,
232         messageDeserializer);
233   }
234
235   @Bean
236   StringDeserializer stringDeserializer()
237   {
238     return new StringDeserializer();
239   }
240
241   @Bean
242   JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
243   {
244     JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
245     deserializer.configure(
246         Map.of(
247             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
248             JsonDeserializer.TYPE_MAPPINGS, typeMappings),
249         false );
250     return deserializer;
251   }
252
253   @Bean
254   String typeMappings ()
255   {
256     return
257         "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
258         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
259   }
260
261   @Bean
262   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
263   {
264     Properties properties = new Properties();
265     properties.setProperty(
266         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
267         chatBackendProperties.getKafka().getBootstrapServers());
268     return properties;
269   }
270
271   @Bean
272   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
273   {
274     Properties properties = new Properties();
275     properties.setProperty(
276         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
277         chatBackendProperties.getKafka().getBootstrapServers());
278     properties.setProperty(
279         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
280         "false");
281     properties.setProperty(
282         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
283         "earliest");
284     return properties;
285   }
286
287   @Bean
288   ShardingPublisherStrategy shardingPublisherStrategy()
289   {
290     return new ShardingPublisherStrategy() {
291       @Override
292       public Mono<String> publishOwnership(int shard)
293       {
294         return Mono.just(Integer.toString(shard));
295       }
296     };
297   }
298
299   @Bean
300   ZoneId zoneId()
301   {
302     return ZoneId.systemDefault();
303   }
304 }