1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
6 import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
9 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
10 import org.apache.kafka.clients.consumer.Consumer;
11 import org.apache.kafka.clients.consumer.ConsumerConfig;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.producer.KafkaProducer;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.apache.kafka.clients.producer.ProducerConfig;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 import org.apache.kafka.common.serialization.StringSerializer;
19 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
20 import org.springframework.context.annotation.Bean;
21 import org.springframework.context.annotation.Configuration;
22 import org.springframework.kafka.support.serializer.JsonDeserializer;
23 import org.springframework.kafka.support.serializer.JsonSerializer;
24 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
26 import java.net.InetSocketAddress;
27 import java.time.Clock;
28 import java.time.ZoneId;
29 import java.util.HashMap;
30 import java.util.List;
32 import java.util.Properties;
35 @ConditionalOnProperty(
36 prefix = "chat.backend",
38 havingValue = "kafka")
40 public class KafkaServicesConfiguration
43 ConsumerTaskRunner consumerTaskRunner(
44 ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
45 ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
46 InfoChannel infoChannel)
48 return new ConsumerTaskRunner(
49 infoChannelConsumerTaskExecutor,
50 dataChannelConsumerTaskExecutor,
55 ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
56 ThreadPoolTaskExecutor taskExecutor,
57 InfoChannel infoChannel,
58 Consumer<String, AbstractMessageTo> infoChannelConsumer,
59 WorkAssignor infoChannelWorkAssignor)
61 return new ConsumerTaskExecutor(
65 infoChannelWorkAssignor);
69 WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
73 String topic = properties.getKafka().getInfoChannelTopic();
74 List<TopicPartition> partitions = consumer
78 new TopicPartition(topic, partitionInfo.partition()))
80 consumer.assign(partitions);
85 ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
86 ThreadPoolTaskExecutor taskExecutor,
87 DataChannel dataChannel,
88 Consumer<String, AbstractMessageTo> dataChannelConsumer,
89 WorkAssignor dataChannelWorkAssignor)
91 return new ConsumerTaskExecutor(
95 dataChannelWorkAssignor);
99 WorkAssignor dataChannelWorkAssignor(
100 ChatBackendProperties properties,
101 DataChannel dataChannel)
105 List<String> topics =
106 List.of(properties.getKafka().getDataChannelTopic());
107 consumer.subscribe(topics, dataChannel);
112 KafkaChatHomeService kafkaChatHome(
113 ChatBackendProperties properties,
114 InfoChannel infoChannel,
115 DataChannel dataChannel)
117 return new KafkaChatHomeService(
118 properties.getKafka().getNumPartitions(),
124 InfoChannel infoChannel(
125 ChatBackendProperties properties,
126 Producer<String, AbstractMessageTo> producer,
127 Consumer<String, AbstractMessageTo> infoChannelConsumer,
128 ChannelMediator channelMediator)
130 InfoChannel infoChannel = new InfoChannel(
131 properties.getKafka().getInfoChannelTopic(),
134 properties.getKafka().getPollingInterval(),
135 properties.getKafka().getNumPartitions(),
136 properties.getKafka().getInstanceUri());
137 channelMediator.setInfoChannel(infoChannel);
142 DataChannel dataChannel(
143 ChatBackendProperties properties,
144 Producer<String, AbstractMessageTo> producer,
145 Consumer<String, AbstractMessageTo> dataChannelConsumer,
148 ChannelMediator channelMediator,
149 ShardingPublisherStrategy shardingPublisherStrategy)
151 return new DataChannel(
152 properties.getInstanceId(),
153 properties.getKafka().getDataChannelTopic(),
157 properties.getKafka().getNumPartitions(),
158 properties.getKafka().getPollingInterval(),
159 properties.getChatroomBufferSize(),
162 shardingPublisherStrategy);
166 ChannelMediator channelMediator()
168 return new ChannelMediator();
172 Producer<String, AbstractMessageTo> producer(
173 Properties defaultProducerProperties,
174 ChatBackendProperties chatBackendProperties,
175 StringSerializer stringSerializer,
176 JsonSerializer<AbstractMessageTo> messageSerializer)
178 Map<String, Object> properties = new HashMap<>();
179 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
181 ProducerConfig.CLIENT_ID_CONFIG,
182 chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
183 return new KafkaProducer<>(
190 StringSerializer stringSerializer()
192 return new StringSerializer();
196 JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
198 JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
199 serializer.configure(
201 JsonSerializer.TYPE_MAPPINGS, typeMappings),
207 Consumer<String, AbstractMessageTo> infoChannelConsumer(
208 Properties defaultConsumerProperties,
209 ChatBackendProperties chatBackendProperties,
210 StringDeserializer stringDeserializer,
211 JsonDeserializer<AbstractMessageTo> messageDeserializer)
213 Map<String, Object> properties = new HashMap<>();
214 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
216 ConsumerConfig.CLIENT_ID_CONFIG,
217 chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
219 ConsumerConfig.GROUP_ID_CONFIG,
221 return new KafkaConsumer<>(
224 messageDeserializer);
228 Consumer<String, AbstractMessageTo> dataChannelConsumer(
229 Properties defaultConsumerProperties,
230 ChatBackendProperties chatBackendProperties,
231 StringDeserializer stringDeserializer,
232 JsonDeserializer<AbstractMessageTo> messageDeserializer)
234 Map<String, Object> properties = new HashMap<>();
235 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
237 ConsumerConfig.CLIENT_ID_CONFIG,
238 chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
240 ConsumerConfig.GROUP_ID_CONFIG,
242 return new KafkaConsumer<>(
245 messageDeserializer);
249 StringDeserializer stringDeserializer()
251 return new StringDeserializer();
255 JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
257 JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
258 deserializer.configure(
260 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
261 JsonDeserializer.TYPE_MAPPINGS, typeMappings),
267 String typeMappings ()
270 "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," +
271 "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
275 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
277 Properties properties = new Properties();
278 properties.setProperty(
279 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
280 chatBackendProperties.getKafka().getBootstrapServers());
285 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
287 Properties properties = new Properties();
288 properties.setProperty(
289 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
290 chatBackendProperties.getKafka().getBootstrapServers());
291 properties.setProperty(
292 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
294 properties.setProperty(
295 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
301 ShardingPublisherStrategy shardingPublisherStrategy(
302 ChatBackendProperties properties)
304 String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
305 InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
306 return new HaproxyShardingPublisherStrategy(
308 properties.getKafka().getHaproxyMap(),
309 properties.getInstanceId());
315 return ZoneId.systemDefault();