fix: GREEN - `DataChannel` creates entries for existent chat-rooms
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
22 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
23
24 import java.time.Clock;
25 import java.time.ZoneId;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30
31
32 @ConditionalOnProperty(
33     prefix = "chat.backend",
34     name = "services",
35     havingValue = "kafka")
36 @Configuration
37 public class KafkaServicesConfiguration
38 {
39   @Bean
40   ConsumerTaskRunner consumerTaskRunner(
41       ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
42       ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
43   {
44     return new ConsumerTaskRunner(
45         infoChannelConsumerTaskExecutor,
46         dataChannelConsumerTaskExecutor);
47   }
48
49   @Bean
50   ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
51       ThreadPoolTaskExecutor taskExecutor,
52       InfoChannel infoChannel,
53       Consumer<String, AbstractMessageTo> infoChannelConsumer,
54       WorkAssignor infoChannelWorkAssignor)
55   {
56     return new ConsumerTaskExecutor(
57         taskExecutor,
58         infoChannel,
59         infoChannelConsumer,
60         infoChannelWorkAssignor);
61   }
62
63   @Bean
64   WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
65   {
66     return consumer ->
67     {
68       String topic = properties.getKafka().getInfoChannelTopic();
69       List<TopicPartition> partitions = consumer
70           .partitionsFor(topic)
71           .stream()
72           .map(partitionInfo ->
73               new TopicPartition(topic, partitionInfo.partition()))
74           .toList();
75       consumer.assign(partitions);
76     };
77   }
78
79   @Bean
80   ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
81       ThreadPoolTaskExecutor taskExecutor,
82       DataChannel dataChannel,
83       Consumer<String, AbstractMessageTo> dataChannelConsumer,
84       WorkAssignor dataChannelWorkAssignor)
85   {
86     return new ConsumerTaskExecutor(
87         taskExecutor,
88         dataChannel,
89         dataChannelConsumer,
90         dataChannelWorkAssignor);
91   }
92
93   @Bean
94   WorkAssignor dataChannelWorkAssignor(
95       ChatBackendProperties properties,
96       DataChannel dataChannel)
97   {
98     return consumer ->
99     {
100       List<String> topics =
101           List.of(properties.getKafka().getDataChannelTopic());
102       consumer.subscribe(topics, dataChannel);
103     };
104   }
105
106   @Bean
107     ChatHomeService kafkaChatHome(
108       ChatBackendProperties properties,
109       InfoChannel infoChannel,
110       DataChannel dataChannel)
111   {
112     return new KafkaChatHomeService(
113         properties.getKafka().getNumPartitions(),
114         infoChannel,
115         dataChannel);
116   }
117
118   @Bean
119   InfoChannel infoChannel(
120       ChatBackendProperties properties,
121       Producer<String, AbstractMessageTo> producer,
122       Consumer<String, AbstractMessageTo> infoChannelConsumer)
123   {
124     return new InfoChannel(
125         properties.getKafka().getInfoChannelTopic(),
126         producer,
127         infoChannelConsumer);
128   }
129
130   @Bean
131   DataChannel dataChannel(
132       ChatBackendProperties properties,
133       Producer<String, AbstractMessageTo> producer,
134       Consumer<String, AbstractMessageTo> dataChannelConsumer,
135       ZoneId zoneId,
136       Clock clock,
137       InfoChannel infoChannel)
138   {
139     return new DataChannel(
140         properties.getKafka().getDataChannelTopic(),
141         producer,
142         dataChannelConsumer,
143         zoneId,
144         properties.getKafka().getNumPartitions(),
145         properties.getChatroomBufferSize(),
146         clock,
147         infoChannel);
148   }
149
150   @Bean
151   Producer<String, AbstractMessageTo>  producer(
152       Properties defaultProducerProperties,
153       ChatBackendProperties chatBackendProperties,
154       StringSerializer stringSerializer,
155       JsonSerializer<AbstractMessageTo> messageSerializer)
156   {
157     Map<String, Object> properties = new HashMap<>();
158     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
159     properties.put(
160         ProducerConfig.CLIENT_ID_CONFIG,
161         chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
162     return new KafkaProducer<>(
163         properties,
164         stringSerializer,
165         messageSerializer);
166   }
167
168   @Bean
169   StringSerializer stringSerializer()
170   {
171     return new StringSerializer();
172   }
173
174   @Bean
175   JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
176   {
177     JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
178     serializer.configure(
179         Map.of(
180             JsonSerializer.TYPE_MAPPINGS, typeMappings),
181         false);
182     return serializer;
183   }
184
185   @Bean
186   Consumer<String, AbstractMessageTo>  infoChannelConsumer(
187       Properties defaultConsumerProperties,
188       ChatBackendProperties chatBackendProperties,
189       StringDeserializer stringDeserializer,
190       JsonDeserializer<AbstractMessageTo> messageDeserializer)
191   {
192     Map<String, Object> properties = new HashMap<>();
193     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
194     properties.put(
195         ConsumerConfig.CLIENT_ID_CONFIG,
196         chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
197     properties.put(
198         ConsumerConfig.GROUP_ID_CONFIG,
199         "info_channel");
200     return new KafkaConsumer<>(
201         properties,
202         stringDeserializer,
203         messageDeserializer);
204   }
205
206   @Bean
207   Consumer<String, AbstractMessageTo>  dataChannelConsumer(
208       Properties defaultConsumerProperties,
209       ChatBackendProperties chatBackendProperties,
210       StringDeserializer stringDeserializer,
211       JsonDeserializer<AbstractMessageTo> messageDeserializer)
212   {
213     Map<String, Object> properties = new HashMap<>();
214     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
215     properties.put(
216         ConsumerConfig.CLIENT_ID_CONFIG,
217         chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
218     properties.put(
219         ConsumerConfig.GROUP_ID_CONFIG,
220         "data_channel");
221     return new KafkaConsumer<>(
222         properties,
223         stringDeserializer,
224         messageDeserializer);
225   }
226
227   @Bean
228   StringDeserializer stringDeserializer()
229   {
230     return new StringDeserializer();
231   }
232
233   @Bean
234   JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
235   {
236     JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
237     deserializer.configure(
238         Map.of(
239             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
240             JsonDeserializer.TYPE_MAPPINGS, typeMappings),
241         false );
242     return deserializer;
243   }
244
245   @Bean
246   String typeMappings ()
247   {
248     return
249         "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
250         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
251   }
252
253   @Bean
254   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
255   {
256     Properties properties = new Properties();
257     properties.setProperty(
258         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
259         chatBackendProperties.getKafka().getBootstrapServers());
260     return properties;
261   }
262
263   @Bean
264   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
265   {
266     Properties properties = new Properties();
267     properties.setProperty(
268         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
269         chatBackendProperties.getKafka().getBootstrapServers());
270     properties.setProperty(
271         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
272         "false");
273     properties.setProperty(
274         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
275         "earliest");
276     return properties;
277   }
278
279   @Bean
280   ZoneId zoneId()
281   {
282     return ZoneId.systemDefault();
283   }
284 }