1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
5 import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerConfig;
11 import org.apache.kafka.clients.consumer.KafkaConsumer;
12 import org.apache.kafka.clients.producer.KafkaProducer;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerConfig;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.serialization.StringDeserializer;
17 import org.apache.kafka.common.serialization.StringSerializer;
18 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
19 import org.springframework.context.annotation.Bean;
20 import org.springframework.context.annotation.Configuration;
21 import org.springframework.kafka.support.serializer.JsonDeserializer;
22 import org.springframework.kafka.support.serializer.JsonSerializer;
23 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
25 import java.net.InetSocketAddress;
26 import java.time.Clock;
27 import java.time.ZoneId;
28 import java.util.HashMap;
29 import java.util.List;
31 import java.util.Properties;
34 @ConditionalOnProperty(
35 prefix = "chat.backend",
37 havingValue = "kafka")
39 public class KafkaServicesConfiguration
42 ChannelTaskRunner channelTaskRunner(
43 ChannelTaskExecutor infoChannelTaskExecutor,
44 ChannelTaskExecutor dataChannelTaskExecutor)
46 return new ChannelTaskRunner(
47 infoChannelTaskExecutor,
48 dataChannelTaskExecutor);
52 ChannelTaskExecutor infoChannelTaskExecutor(
53 ThreadPoolTaskExecutor taskExecutor,
54 InfoChannel infoChannel,
55 Consumer<String, AbstractMessageTo> infoChannelConsumer,
56 WorkAssignor infoChannelWorkAssignor)
58 return new ChannelTaskExecutor(
62 infoChannelWorkAssignor);
66 WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
70 String topic = properties.getKafka().getInfoChannelTopic();
71 List<TopicPartition> partitions = consumer
75 new TopicPartition(topic, partitionInfo.partition()))
77 consumer.assign(partitions);
82 ChannelTaskExecutor dataChannelTaskExecutor(
83 ThreadPoolTaskExecutor taskExecutor,
84 DataChannel dataChannel,
85 Consumer<String, AbstractMessageTo> dataChannelConsumer,
86 WorkAssignor dataChannelWorkAssignor)
88 return new ChannelTaskExecutor(
92 dataChannelWorkAssignor);
96 WorkAssignor dataChannelWorkAssignor(
97 ChatBackendProperties properties,
98 DataChannel dataChannel)
102 List<String> topics =
103 List.of(properties.getKafka().getDataChannelTopic());
104 consumer.subscribe(topics, dataChannel);
109 KafkaChatHomeService kafkaChatHome(
110 ChatBackendProperties properties,
111 InfoChannel infoChannel,
112 DataChannel dataChannel)
114 return new KafkaChatHomeService(
115 properties.getKafka().getNumPartitions(),
121 InfoChannel infoChannel(
122 ChatBackendProperties properties,
123 Producer<String, AbstractMessageTo> producer,
124 Consumer<String, AbstractMessageTo> infoChannelConsumer,
125 ChannelMediator channelMediator)
127 InfoChannel infoChannel = new InfoChannel(
128 properties.getKafka().getInfoChannelTopic(),
131 properties.getKafka().getPollingInterval(),
132 properties.getKafka().getNumPartitions(),
133 properties.getKafka().getInstanceUri(),
135 channelMediator.setInfoChannel(infoChannel);
140 DataChannel dataChannel(
141 ChatBackendProperties properties,
142 Producer<String, AbstractMessageTo> producer,
143 Consumer<String, AbstractMessageTo> dataChannelConsumer,
146 ChannelMediator channelMediator,
147 ShardingPublisherStrategy shardingPublisherStrategy)
149 DataChannel dataChannel = new DataChannel(
150 properties.getInstanceId(),
151 properties.getKafka().getDataChannelTopic(),
155 properties.getKafka().getNumPartitions(),
156 properties.getKafka().getPollingInterval(),
157 properties.getChatroomBufferSize(),
160 shardingPublisherStrategy);
161 channelMediator.setDataChannel(dataChannel);
166 ChannelMediator channelMediator()
168 return new ChannelMediator();
172 Producer<String, AbstractMessageTo> producer(
173 Properties defaultProducerProperties,
174 ChatBackendProperties chatBackendProperties,
175 StringSerializer stringSerializer,
176 JsonSerializer<AbstractMessageTo> messageSerializer)
178 Map<String, Object> properties = new HashMap<>();
179 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
181 ProducerConfig.CLIENT_ID_CONFIG,
182 chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
183 return new KafkaProducer<>(
190 StringSerializer stringSerializer()
192 return new StringSerializer();
196 JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
198 JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
199 serializer.configure(
201 JsonSerializer.TYPE_MAPPINGS, typeMappings),
207 Consumer<String, AbstractMessageTo> infoChannelConsumer(
208 Properties defaultConsumerProperties,
209 ChatBackendProperties chatBackendProperties,
210 StringDeserializer stringDeserializer,
211 JsonDeserializer<AbstractMessageTo> messageDeserializer)
213 Map<String, Object> properties = new HashMap<>();
214 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
216 ConsumerConfig.CLIENT_ID_CONFIG,
217 chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
219 ConsumerConfig.GROUP_ID_CONFIG,
221 return new KafkaConsumer<>(
224 messageDeserializer);
228 Consumer<String, AbstractMessageTo> dataChannelConsumer(
229 Properties defaultConsumerProperties,
230 ChatBackendProperties chatBackendProperties,
231 StringDeserializer stringDeserializer,
232 JsonDeserializer<AbstractMessageTo> messageDeserializer)
234 Map<String, Object> properties = new HashMap<>();
235 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
237 ConsumerConfig.CLIENT_ID_CONFIG,
238 chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
240 ConsumerConfig.GROUP_ID_CONFIG,
242 return new KafkaConsumer<>(
245 messageDeserializer);
249 StringDeserializer stringDeserializer()
251 return new StringDeserializer();
255 JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
257 JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
258 deserializer.configure(
260 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
261 JsonDeserializer.TYPE_MAPPINGS, typeMappings),
267 String typeMappings ()
270 "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," +
271 "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
275 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
277 Properties properties = new Properties();
278 properties.setProperty(
279 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
280 chatBackendProperties.getKafka().getBootstrapServers());
285 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
287 Properties properties = new Properties();
288 properties.setProperty(
289 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
290 chatBackendProperties.getKafka().getBootstrapServers());
291 properties.setProperty(
292 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
294 properties.setProperty(
295 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
301 ShardingPublisherStrategy shardingPublisherStrategy(
302 ChatBackendProperties properties)
304 String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
305 InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
306 return new HaproxyShardingPublisherStrategy(
308 properties.getKafka().getHaproxyMap(),
309 properties.getInstanceId());
315 return ZoneId.systemDefault();