refactor: Separated channels for data and info -- Refactored/aligned code
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
22 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
23
24 import java.time.Clock;
25 import java.time.ZoneId;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30
31
32 @ConditionalOnProperty(
33     prefix = "chat.backend",
34     name = "services",
35     havingValue = "kafka")
36 @Configuration
37 public class KafkaServicesConfiguration
38 {
39   @Bean
40   ConsumerTaskRunner consumerTaskRunner(
41       ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
42       ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
43   {
44     return new ConsumerTaskRunner(
45         infoChannelConsumerTaskExecutor,
46         dataChannelConsumerTaskExecutor);
47   }
48
49   @Bean
50   ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
51       ThreadPoolTaskExecutor taskExecutor,
52       InfoChannel infoChannel,
53       Consumer<String, AbstractMessageTo> infoChannelConsumer,
54       ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor)
55   {
56     return new ConsumerTaskExecutor(
57         taskExecutor,
58         infoChannel,
59         infoChannelConsumer,
60         infoChannelWorkAssignor);
61   }
62
63   @Bean
64   ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor(
65       ChatBackendProperties properties)
66   {
67     return consumer ->
68     {
69       String topic = properties.getKafka().getInfoChannelTopic();
70       List<TopicPartition> partitions = consumer
71           .partitionsFor(topic)
72           .stream()
73           .map(partitionInfo ->
74               new TopicPartition(topic, partitionInfo.partition()))
75           .toList();
76       consumer.assign(partitions);
77     };
78   }
79
80   @Bean
81   ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
82       ThreadPoolTaskExecutor taskExecutor,
83       DataChannel dataChannel,
84       Consumer<String, AbstractMessageTo> dataChannelConsumer,
85       ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor)
86   {
87     return new ConsumerTaskExecutor(
88         taskExecutor,
89         dataChannel,
90         dataChannelConsumer,
91         dataChannelWorkAssignor);
92   }
93
94   @Bean
95   ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor(
96       ChatBackendProperties properties,
97       DataChannel dataChannel)
98   {
99     return consumer ->
100     {
101       List<String> topics =
102           List.of(properties.getKafka().getDataChannelTopic());
103       consumer.subscribe(topics, dataChannel);
104     };
105   }
106
107   @Bean
108     ChatHomeService kafkaChatHome(
109       ChatBackendProperties properties,
110       InfoChannel infoChannel,
111       DataChannel dataChannel)
112   {
113     return new KafkaChatHomeService(
114         properties.getKafka().getNumPartitions(),
115         infoChannel,
116         dataChannel);
117   }
118
119   @Bean
120   InfoChannel infoChannel(
121       ChatBackendProperties properties,
122       Producer<String, AbstractMessageTo> producer,
123       Consumer<String, AbstractMessageTo> infoChannelConsumer)
124   {
125     return new InfoChannel(
126         properties.getKafka().getInfoChannelTopic(),
127         producer,
128         infoChannelConsumer);
129   }
130
131   @Bean
132   DataChannel dataChannel(
133       ChatBackendProperties properties,
134       Producer<String, AbstractMessageTo> producer,
135       Consumer<String, AbstractMessageTo> dataChannelConsumer,
136       ZoneId zoneId,
137       Clock clock)
138   {
139     return new DataChannel(
140         properties.getKafka().getDataChannelTopic(),
141         producer,
142         dataChannelConsumer,
143         zoneId,
144         properties.getKafka().getNumPartitions(),
145         properties.getChatroomBufferSize(),
146         clock);
147   }
148
149   @Bean
150   Producer<String, AbstractMessageTo>  producer(
151       Properties defaultProducerProperties,
152       ChatBackendProperties chatBackendProperties,
153       StringSerializer stringSerializer,
154       JsonSerializer<AbstractMessageTo> messageSerializer)
155   {
156     Map<String, Object> properties = new HashMap<>();
157     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
158     properties.put(
159         ProducerConfig.CLIENT_ID_CONFIG,
160         chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
161     return new KafkaProducer<>(
162         properties,
163         stringSerializer,
164         messageSerializer);
165   }
166
167   @Bean
168   StringSerializer stringSerializer()
169   {
170     return new StringSerializer();
171   }
172
173   @Bean
174   JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
175   {
176     JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
177     serializer.configure(
178         Map.of(
179             JsonSerializer.TYPE_MAPPINGS, typeMappings),
180         false);
181     return serializer;
182   }
183
184   @Bean
185   Consumer<String, AbstractMessageTo>  infoChannelConsumer(
186       Properties defaultConsumerProperties,
187       ChatBackendProperties chatBackendProperties,
188       StringDeserializer stringDeserializer,
189       JsonDeserializer<AbstractMessageTo> messageDeserializer)
190   {
191     Map<String, Object> properties = new HashMap<>();
192     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
193     properties.put(
194         ConsumerConfig.CLIENT_ID_CONFIG,
195         chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
196     properties.put(
197         ConsumerConfig.GROUP_ID_CONFIG,
198         "info_channel");
199     return new KafkaConsumer<>(
200         properties,
201         stringDeserializer,
202         messageDeserializer);
203   }
204
205   @Bean
206   Consumer<String, AbstractMessageTo>  dataChannelConsumer(
207       Properties defaultConsumerProperties,
208       ChatBackendProperties chatBackendProperties,
209       StringDeserializer stringDeserializer,
210       JsonDeserializer<AbstractMessageTo> messageDeserializer)
211   {
212     Map<String, Object> properties = new HashMap<>();
213     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
214     properties.put(
215         ConsumerConfig.CLIENT_ID_CONFIG,
216         chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
217     properties.put(
218         ConsumerConfig.GROUP_ID_CONFIG,
219         "data_channel");
220     return new KafkaConsumer<>(
221         properties,
222         stringDeserializer,
223         messageDeserializer);
224   }
225
226   @Bean
227   StringDeserializer stringDeserializer()
228   {
229     return new StringDeserializer();
230   }
231
232   @Bean
233   JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
234   {
235     JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
236     deserializer.configure(
237         Map.of(
238             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
239             JsonDeserializer.TYPE_MAPPINGS, typeMappings),
240         false );
241     return deserializer;
242   }
243
244   @Bean
245   String typeMappings ()
246   {
247     return
248         "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
249         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
250   }
251
252   @Bean
253   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
254   {
255     Properties properties = new Properties();
256     properties.setProperty(
257         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
258         chatBackendProperties.getKafka().getBootstrapServers());
259     return properties;
260   }
261
262   @Bean
263   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
264   {
265     Properties properties = new Properties();
266     properties.setProperty(
267         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
268         chatBackendProperties.getKafka().getBootstrapServers());
269     properties.setProperty(
270         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
271         "false");
272     properties.setProperty(
273         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
274         "earliest");
275     return properties;
276   }
277
278   @Bean
279   ZoneId zoneId()
280   {
281     return ZoneId.systemDefault();
282   }
283 }