1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
5 import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerConfig;
11 import org.apache.kafka.clients.consumer.KafkaConsumer;
12 import org.apache.kafka.clients.producer.KafkaProducer;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerConfig;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.serialization.StringDeserializer;
17 import org.apache.kafka.common.serialization.StringSerializer;
18 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
19 import org.springframework.context.annotation.Bean;
20 import org.springframework.context.annotation.Configuration;
21 import org.springframework.kafka.support.serializer.JsonDeserializer;
22 import org.springframework.kafka.support.serializer.JsonSerializer;
23 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
25 import java.net.InetSocketAddress;
26 import java.time.Clock;
27 import java.time.ZoneId;
28 import java.util.HashMap;
29 import java.util.List;
31 import java.util.Properties;
34 @ConditionalOnProperty(
35 prefix = "chat.backend",
37 havingValue = "kafka")
39 public class KafkaServicesConfiguration
42 KafkaServicesThreadPoolTaskExecutorCustomizer kafkaServicesThreadPoolTaskExecutorCustomizer()
44 return new KafkaServicesThreadPoolTaskExecutorCustomizer();
47 @Bean(initMethod = "executeChannelTask", destroyMethod = "join")
48 ChannelTaskExecutor infoChannelTaskExecutor(
49 ThreadPoolTaskExecutor taskExecutor,
50 InfoChannel infoChannel,
51 Consumer<String, AbstractMessageTo> infoChannelConsumer,
52 WorkAssignor infoChannelWorkAssignor)
54 return new ChannelTaskExecutor(
58 infoChannelWorkAssignor);
62 WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
66 String topic = properties.getKafka().getInfoChannelTopic();
67 List<TopicPartition> partitions = consumer
71 new TopicPartition(topic, partitionInfo.partition()))
73 consumer.assign(partitions);
77 @Bean(initMethod = "executeChannelTask", destroyMethod = "join")
78 ChannelTaskExecutor dataChannelTaskExecutor(
79 ThreadPoolTaskExecutor taskExecutor,
80 DataChannel dataChannel,
81 Consumer<String, AbstractMessageTo> dataChannelConsumer,
82 WorkAssignor dataChannelWorkAssignor)
84 return new ChannelTaskExecutor(
88 dataChannelWorkAssignor);
92 WorkAssignor dataChannelWorkAssignor(
93 ChatBackendProperties properties,
94 DataChannel dataChannel)
99 List.of(properties.getKafka().getDataChannelTopic());
100 consumer.subscribe(topics, dataChannel);
105 KafkaChatHomeService kafkaChatHome(
106 ChatBackendProperties properties,
107 InfoChannel infoChannel,
108 DataChannel dataChannel)
110 return new KafkaChatHomeService(
111 properties.getKafka().getNumPartitions(),
117 InfoChannel infoChannel(
118 ChatBackendProperties properties,
119 Producer<String, AbstractMessageTo> producer,
120 Consumer<String, AbstractMessageTo> infoChannelConsumer,
121 ChannelMediator channelMediator)
123 InfoChannel infoChannel = new InfoChannel(
124 properties.getKafka().getInfoChannelTopic(),
127 properties.getKafka().getPollingInterval(),
128 properties.getKafka().getNumPartitions(),
129 properties.getKafka().getInstanceUri(),
131 channelMediator.setInfoChannel(infoChannel);
136 DataChannel dataChannel(
137 ChatBackendProperties properties,
138 Producer<String, AbstractMessageTo> producer,
139 Consumer<String, AbstractMessageTo> dataChannelConsumer,
142 ChannelMediator channelMediator,
143 ShardingPublisherStrategy shardingPublisherStrategy)
145 DataChannel dataChannel = new DataChannel(
146 properties.getInstanceId(),
147 properties.getKafka().getDataChannelTopic(),
151 properties.getKafka().getNumPartitions(),
152 properties.getKafka().getPollingInterval(),
153 properties.getChatroomHistoryLimit(),
156 shardingPublisherStrategy);
157 channelMediator.setDataChannel(dataChannel);
162 ChannelMediator channelMediator()
164 return new ChannelMediator();
168 Producer<String, AbstractMessageTo> producer(
169 Properties defaultProducerProperties,
170 ChatBackendProperties chatBackendProperties,
171 StringSerializer stringSerializer,
172 JsonSerializer<AbstractMessageTo> messageSerializer)
174 Map<String, Object> properties = new HashMap<>();
175 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
177 ProducerConfig.CLIENT_ID_CONFIG,
178 chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
179 return new KafkaProducer<>(
186 StringSerializer stringSerializer()
188 return new StringSerializer();
192 JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
194 JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
195 serializer.configure(
197 JsonSerializer.TYPE_MAPPINGS, typeMappings),
203 Consumer<String, AbstractMessageTo> infoChannelConsumer(
204 Properties defaultConsumerProperties,
205 ChatBackendProperties chatBackendProperties,
206 StringDeserializer stringDeserializer,
207 JsonDeserializer<AbstractMessageTo> messageDeserializer)
209 Map<String, Object> properties = new HashMap<>();
210 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
212 ConsumerConfig.CLIENT_ID_CONFIG,
213 chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
215 ConsumerConfig.GROUP_ID_CONFIG,
217 return new KafkaConsumer<>(
220 messageDeserializer);
224 Consumer<String, AbstractMessageTo> dataChannelConsumer(
225 Properties defaultConsumerProperties,
226 ChatBackendProperties chatBackendProperties,
227 StringDeserializer stringDeserializer,
228 JsonDeserializer<AbstractMessageTo> messageDeserializer)
230 Map<String, Object> properties = new HashMap<>();
231 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
233 ConsumerConfig.CLIENT_ID_CONFIG,
234 chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
236 ConsumerConfig.GROUP_ID_CONFIG,
238 return new KafkaConsumer<>(
241 messageDeserializer);
245 StringDeserializer stringDeserializer()
247 return new StringDeserializer();
251 JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
253 JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
254 deserializer.configure(
256 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
257 JsonDeserializer.TYPE_MAPPINGS, typeMappings),
263 String typeMappings ()
266 "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," +
267 "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
271 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
273 Properties properties = new Properties();
274 properties.setProperty(
275 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
276 chatBackendProperties.getKafka().getBootstrapServers());
281 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
283 Properties properties = new Properties();
284 properties.setProperty(
285 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
286 chatBackendProperties.getKafka().getBootstrapServers());
287 properties.setProperty(
288 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
290 properties.setProperty(
291 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
297 ShardingPublisherStrategy shardingPublisherStrategy(
298 ChatBackendProperties properties)
300 String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
301 InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
302 return new HaproxyShardingPublisherStrategy(
304 properties.getKafka().getHaproxyMap(),
305 properties.getInstanceId());
311 return ZoneId.systemDefault();
315 ChannelReactiveHealthIndicator dataChannelHealthIndicator(
316 DataChannel dataChannel)
318 return new ChannelReactiveHealthIndicator(dataChannel);
322 ChannelReactiveHealthIndicator infoChannelHealthIndicator(InfoChannel infoChannel)
324 return new ChannelReactiveHealthIndicator(infoChannel);