fix: `ConsumerTaskRunner` waits until the data-loading is finished
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
22 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
23
24 import java.time.Clock;
25 import java.time.ZoneId;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30
31
32 @ConditionalOnProperty(
33     prefix = "chat.backend",
34     name = "services",
35     havingValue = "kafka")
36 @Configuration
37 public class KafkaServicesConfiguration
38 {
39   @Bean
40   ConsumerTaskRunner consumerTaskRunner(
41       ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
42       ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
43       InfoChannel infoChannel)
44   {
45     return new ConsumerTaskRunner(
46         infoChannelConsumerTaskExecutor,
47         dataChannelConsumerTaskExecutor,
48         infoChannel);
49   }
50
51   @Bean
52   ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
53       ThreadPoolTaskExecutor taskExecutor,
54       InfoChannel infoChannel,
55       Consumer<String, AbstractMessageTo> infoChannelConsumer,
56       WorkAssignor infoChannelWorkAssignor)
57   {
58     return new ConsumerTaskExecutor(
59         taskExecutor,
60         infoChannel,
61         infoChannelConsumer,
62         infoChannelWorkAssignor);
63   }
64
65   @Bean
66   WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
67   {
68     return consumer ->
69     {
70       String topic = properties.getKafka().getInfoChannelTopic();
71       List<TopicPartition> partitions = consumer
72           .partitionsFor(topic)
73           .stream()
74           .map(partitionInfo ->
75               new TopicPartition(topic, partitionInfo.partition()))
76           .toList();
77       consumer.assign(partitions);
78     };
79   }
80
81   @Bean
82   ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
83       ThreadPoolTaskExecutor taskExecutor,
84       DataChannel dataChannel,
85       Consumer<String, AbstractMessageTo> dataChannelConsumer,
86       WorkAssignor dataChannelWorkAssignor)
87   {
88     return new ConsumerTaskExecutor(
89         taskExecutor,
90         dataChannel,
91         dataChannelConsumer,
92         dataChannelWorkAssignor);
93   }
94
95   @Bean
96   WorkAssignor dataChannelWorkAssignor(
97       ChatBackendProperties properties,
98       DataChannel dataChannel)
99   {
100     return consumer ->
101     {
102       List<String> topics =
103           List.of(properties.getKafka().getDataChannelTopic());
104       consumer.subscribe(topics, dataChannel);
105     };
106   }
107
108   @Bean
109     ChatHomeService kafkaChatHome(
110       ChatBackendProperties properties,
111       InfoChannel infoChannel,
112       DataChannel dataChannel)
113   {
114     return new KafkaChatHomeService(
115         properties.getKafka().getNumPartitions(),
116         infoChannel,
117         dataChannel);
118   }
119
120   @Bean
121   InfoChannel infoChannel(
122       ChatBackendProperties properties,
123       Producer<String, AbstractMessageTo> producer,
124       Consumer<String, AbstractMessageTo> infoChannelConsumer)
125   {
126     return new InfoChannel(
127         properties.getKafka().getInfoChannelTopic(),
128         producer,
129         infoChannelConsumer,
130         properties.getKafka().getInstanceUri());
131   }
132
133   @Bean
134   DataChannel dataChannel(
135       ChatBackendProperties properties,
136       Producer<String, AbstractMessageTo> producer,
137       Consumer<String, AbstractMessageTo> dataChannelConsumer,
138       ZoneId zoneId,
139       Clock clock,
140       InfoChannel infoChannel)
141   {
142     return new DataChannel(
143         properties.getKafka().getDataChannelTopic(),
144         producer,
145         dataChannelConsumer,
146         zoneId,
147         properties.getKafka().getNumPartitions(),
148         properties.getChatroomBufferSize(),
149         clock,
150         infoChannel);
151   }
152
153   @Bean
154   Producer<String, AbstractMessageTo>  producer(
155       Properties defaultProducerProperties,
156       ChatBackendProperties chatBackendProperties,
157       StringSerializer stringSerializer,
158       JsonSerializer<AbstractMessageTo> messageSerializer)
159   {
160     Map<String, Object> properties = new HashMap<>();
161     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
162     properties.put(
163         ProducerConfig.CLIENT_ID_CONFIG,
164         chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
165     return new KafkaProducer<>(
166         properties,
167         stringSerializer,
168         messageSerializer);
169   }
170
171   @Bean
172   StringSerializer stringSerializer()
173   {
174     return new StringSerializer();
175   }
176
177   @Bean
178   JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
179   {
180     JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
181     serializer.configure(
182         Map.of(
183             JsonSerializer.TYPE_MAPPINGS, typeMappings),
184         false);
185     return serializer;
186   }
187
188   @Bean
189   Consumer<String, AbstractMessageTo>  infoChannelConsumer(
190       Properties defaultConsumerProperties,
191       ChatBackendProperties chatBackendProperties,
192       StringDeserializer stringDeserializer,
193       JsonDeserializer<AbstractMessageTo> messageDeserializer)
194   {
195     Map<String, Object> properties = new HashMap<>();
196     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
197     properties.put(
198         ConsumerConfig.CLIENT_ID_CONFIG,
199         chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
200     properties.put(
201         ConsumerConfig.GROUP_ID_CONFIG,
202         "info_channel");
203     return new KafkaConsumer<>(
204         properties,
205         stringDeserializer,
206         messageDeserializer);
207   }
208
209   @Bean
210   Consumer<String, AbstractMessageTo>  dataChannelConsumer(
211       Properties defaultConsumerProperties,
212       ChatBackendProperties chatBackendProperties,
213       StringDeserializer stringDeserializer,
214       JsonDeserializer<AbstractMessageTo> messageDeserializer)
215   {
216     Map<String, Object> properties = new HashMap<>();
217     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
218     properties.put(
219         ConsumerConfig.CLIENT_ID_CONFIG,
220         chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
221     properties.put(
222         ConsumerConfig.GROUP_ID_CONFIG,
223         "data_channel");
224     return new KafkaConsumer<>(
225         properties,
226         stringDeserializer,
227         messageDeserializer);
228   }
229
230   @Bean
231   StringDeserializer stringDeserializer()
232   {
233     return new StringDeserializer();
234   }
235
236   @Bean
237   JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
238   {
239     JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
240     deserializer.configure(
241         Map.of(
242             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
243             JsonDeserializer.TYPE_MAPPINGS, typeMappings),
244         false );
245     return deserializer;
246   }
247
248   @Bean
249   String typeMappings ()
250   {
251     return
252         "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
253         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
254   }
255
256   @Bean
257   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
258   {
259     Properties properties = new Properties();
260     properties.setProperty(
261         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
262         chatBackendProperties.getKafka().getBootstrapServers());
263     return properties;
264   }
265
266   @Bean
267   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
268   {
269     Properties properties = new Properties();
270     properties.setProperty(
271         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
272         chatBackendProperties.getKafka().getBootstrapServers());
273     properties.setProperty(
274         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
275         "false");
276     properties.setProperty(
277         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
278         "earliest");
279     return properties;
280   }
281
282   @Bean
283   ZoneId zoneId()
284   {
285     return ZoneId.systemDefault();
286   }
287 }