1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
5 import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerConfig;
11 import org.apache.kafka.clients.consumer.KafkaConsumer;
12 import org.apache.kafka.clients.producer.KafkaProducer;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerConfig;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.serialization.StringDeserializer;
17 import org.apache.kafka.common.serialization.StringSerializer;
18 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
19 import org.springframework.context.annotation.Bean;
20 import org.springframework.context.annotation.Configuration;
21 import org.springframework.kafka.support.serializer.JsonDeserializer;
22 import org.springframework.kafka.support.serializer.JsonSerializer;
23 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
25 import java.net.InetSocketAddress;
26 import java.time.Clock;
27 import java.time.ZoneId;
28 import java.util.HashMap;
29 import java.util.List;
31 import java.util.Properties;
34 @ConditionalOnProperty(
35 prefix = "chat.backend",
37 havingValue = "kafka")
39 public class KafkaServicesConfiguration
42 KafkaServicesThreadPoolTaskExecutorCustomizer kafkaServicesThreadPoolTaskExecutorCustomizer()
44 return new KafkaServicesThreadPoolTaskExecutorCustomizer();
48 ChannelTaskRunner channelTaskRunner(
49 ChannelTaskExecutor infoChannelTaskExecutor,
50 ChannelTaskExecutor dataChannelTaskExecutor)
52 return new ChannelTaskRunner(
53 infoChannelTaskExecutor,
54 dataChannelTaskExecutor);
57 @Bean(destroyMethod = "join")
58 ChannelTaskExecutor infoChannelTaskExecutor(
59 ThreadPoolTaskExecutor taskExecutor,
60 InfoChannel infoChannel,
61 Consumer<String, AbstractMessageTo> infoChannelConsumer,
62 WorkAssignor infoChannelWorkAssignor)
64 return new ChannelTaskExecutor(
68 infoChannelWorkAssignor);
72 WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
76 String topic = properties.getKafka().getInfoChannelTopic();
77 List<TopicPartition> partitions = consumer
81 new TopicPartition(topic, partitionInfo.partition()))
83 consumer.assign(partitions);
87 @Bean(destroyMethod = "join")
88 ChannelTaskExecutor dataChannelTaskExecutor(
89 ThreadPoolTaskExecutor taskExecutor,
90 DataChannel dataChannel,
91 Consumer<String, AbstractMessageTo> dataChannelConsumer,
92 WorkAssignor dataChannelWorkAssignor)
94 return new ChannelTaskExecutor(
98 dataChannelWorkAssignor);
102 WorkAssignor dataChannelWorkAssignor(
103 ChatBackendProperties properties,
104 DataChannel dataChannel)
108 List<String> topics =
109 List.of(properties.getKafka().getDataChannelTopic());
110 consumer.subscribe(topics, dataChannel);
115 KafkaChatHomeService kafkaChatHome(
116 ChatBackendProperties properties,
117 InfoChannel infoChannel,
118 DataChannel dataChannel)
120 return new KafkaChatHomeService(
121 properties.getKafka().getNumPartitions(),
127 InfoChannel infoChannel(
128 ChatBackendProperties properties,
129 Producer<String, AbstractMessageTo> producer,
130 Consumer<String, AbstractMessageTo> infoChannelConsumer,
131 ChannelMediator channelMediator)
133 InfoChannel infoChannel = new InfoChannel(
134 properties.getKafka().getInfoChannelTopic(),
137 properties.getKafka().getPollingInterval(),
138 properties.getKafka().getNumPartitions(),
139 properties.getKafka().getInstanceUri(),
141 channelMediator.setInfoChannel(infoChannel);
146 DataChannel dataChannel(
147 ChatBackendProperties properties,
148 Producer<String, AbstractMessageTo> producer,
149 Consumer<String, AbstractMessageTo> dataChannelConsumer,
152 ChannelMediator channelMediator,
153 ShardingPublisherStrategy shardingPublisherStrategy)
155 DataChannel dataChannel = new DataChannel(
156 properties.getInstanceId(),
157 properties.getKafka().getDataChannelTopic(),
161 properties.getKafka().getNumPartitions(),
162 properties.getKafka().getPollingInterval(),
163 properties.getChatroomBufferSize(),
166 shardingPublisherStrategy);
167 channelMediator.setDataChannel(dataChannel);
172 ChannelMediator channelMediator()
174 return new ChannelMediator();
178 Producer<String, AbstractMessageTo> producer(
179 Properties defaultProducerProperties,
180 ChatBackendProperties chatBackendProperties,
181 StringSerializer stringSerializer,
182 JsonSerializer<AbstractMessageTo> messageSerializer)
184 Map<String, Object> properties = new HashMap<>();
185 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
187 ProducerConfig.CLIENT_ID_CONFIG,
188 chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
189 return new KafkaProducer<>(
196 StringSerializer stringSerializer()
198 return new StringSerializer();
202 JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
204 JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
205 serializer.configure(
207 JsonSerializer.TYPE_MAPPINGS, typeMappings),
213 Consumer<String, AbstractMessageTo> infoChannelConsumer(
214 Properties defaultConsumerProperties,
215 ChatBackendProperties chatBackendProperties,
216 StringDeserializer stringDeserializer,
217 JsonDeserializer<AbstractMessageTo> messageDeserializer)
219 Map<String, Object> properties = new HashMap<>();
220 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
222 ConsumerConfig.CLIENT_ID_CONFIG,
223 chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
225 ConsumerConfig.GROUP_ID_CONFIG,
227 return new KafkaConsumer<>(
230 messageDeserializer);
234 Consumer<String, AbstractMessageTo> dataChannelConsumer(
235 Properties defaultConsumerProperties,
236 ChatBackendProperties chatBackendProperties,
237 StringDeserializer stringDeserializer,
238 JsonDeserializer<AbstractMessageTo> messageDeserializer)
240 Map<String, Object> properties = new HashMap<>();
241 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
243 ConsumerConfig.CLIENT_ID_CONFIG,
244 chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
246 ConsumerConfig.GROUP_ID_CONFIG,
248 return new KafkaConsumer<>(
251 messageDeserializer);
255 StringDeserializer stringDeserializer()
257 return new StringDeserializer();
261 JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
263 JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
264 deserializer.configure(
266 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
267 JsonDeserializer.TYPE_MAPPINGS, typeMappings),
273 String typeMappings ()
276 "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," +
277 "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
281 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
283 Properties properties = new Properties();
284 properties.setProperty(
285 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
286 chatBackendProperties.getKafka().getBootstrapServers());
291 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
293 Properties properties = new Properties();
294 properties.setProperty(
295 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
296 chatBackendProperties.getKafka().getBootstrapServers());
297 properties.setProperty(
298 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
300 properties.setProperty(
301 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
307 ShardingPublisherStrategy shardingPublisherStrategy(
308 ChatBackendProperties properties)
310 String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
311 InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
312 return new HaproxyShardingPublisherStrategy(
314 properties.getKafka().getHaproxyMap(),
315 properties.getInstanceId());
321 return ZoneId.systemDefault();
325 ChannelReactiveHealthIndicator dataChannelHealthIndicator(
326 DataChannel dataChannel)
328 return new ChannelReactiveHealthIndicator(dataChannel);
332 ChannelReactiveHealthIndicator infoChannelHealthIndicator(InfoChannel infoChannel)
334 return new ChannelReactiveHealthIndicator(infoChannel);