1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
6 import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
9 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
10 import org.apache.kafka.clients.consumer.Consumer;
11 import org.apache.kafka.clients.consumer.ConsumerConfig;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.producer.KafkaProducer;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.apache.kafka.clients.producer.ProducerConfig;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 import org.apache.kafka.common.serialization.StringSerializer;
19 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
20 import org.springframework.context.annotation.Bean;
21 import org.springframework.context.annotation.Configuration;
22 import org.springframework.kafka.support.serializer.JsonDeserializer;
23 import org.springframework.kafka.support.serializer.JsonSerializer;
24 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
26 import java.net.InetSocketAddress;
27 import java.time.Clock;
28 import java.time.ZoneId;
29 import java.util.HashMap;
30 import java.util.List;
32 import java.util.Properties;
35 @ConditionalOnProperty(
36 prefix = "chat.backend",
38 havingValue = "kafka")
40 public class KafkaServicesConfiguration
43 ConsumerTaskRunner consumerTaskRunner(
44 ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
45 ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
46 InfoChannel infoChannel)
48 return new ConsumerTaskRunner(
49 infoChannelConsumerTaskExecutor,
50 dataChannelConsumerTaskExecutor,
55 ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
56 ThreadPoolTaskExecutor taskExecutor,
57 InfoChannel infoChannel,
58 Consumer<String, AbstractMessageTo> infoChannelConsumer,
59 WorkAssignor infoChannelWorkAssignor)
61 return new ConsumerTaskExecutor(
65 infoChannelWorkAssignor);
69 WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
73 String topic = properties.getKafka().getInfoChannelTopic();
74 List<TopicPartition> partitions = consumer
78 new TopicPartition(topic, partitionInfo.partition()))
80 consumer.assign(partitions);
85 ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
86 ThreadPoolTaskExecutor taskExecutor,
87 DataChannel dataChannel,
88 Consumer<String, AbstractMessageTo> dataChannelConsumer,
89 WorkAssignor dataChannelWorkAssignor)
91 return new ConsumerTaskExecutor(
95 dataChannelWorkAssignor);
99 WorkAssignor dataChannelWorkAssignor(
100 ChatBackendProperties properties,
101 DataChannel dataChannel)
105 List<String> topics =
106 List.of(properties.getKafka().getDataChannelTopic());
107 consumer.subscribe(topics, dataChannel);
112 KafkaChatHomeService kafkaChatHome(
113 ChatBackendProperties properties,
114 InfoChannel infoChannel,
115 DataChannel dataChannel)
117 return new KafkaChatHomeService(
118 properties.getKafka().getNumPartitions(),
124 InfoChannel infoChannel(
125 ChatBackendProperties properties,
126 Producer<String, AbstractMessageTo> producer,
127 Consumer<String, AbstractMessageTo> infoChannelConsumer,
128 ChannelMediator channelMediator)
130 InfoChannel infoChannel = new InfoChannel(
131 properties.getKafka().getInfoChannelTopic(),
134 properties.getKafka().getPollingInterval(),
135 properties.getKafka().getNumPartitions(),
136 properties.getKafka().getInstanceUri(),
138 channelMediator.setInfoChannel(infoChannel);
143 DataChannel dataChannel(
144 ChatBackendProperties properties,
145 Producer<String, AbstractMessageTo> producer,
146 Consumer<String, AbstractMessageTo> dataChannelConsumer,
149 ChannelMediator channelMediator,
150 ShardingPublisherStrategy shardingPublisherStrategy)
152 DataChannel dataChannel = new DataChannel(
153 properties.getInstanceId(),
154 properties.getKafka().getDataChannelTopic(),
158 properties.getKafka().getNumPartitions(),
159 properties.getKafka().getPollingInterval(),
160 properties.getChatroomBufferSize(),
163 shardingPublisherStrategy);
164 channelMediator.setDataChannel(dataChannel);
169 ChannelMediator channelMediator()
171 return new ChannelMediator();
175 Producer<String, AbstractMessageTo> producer(
176 Properties defaultProducerProperties,
177 ChatBackendProperties chatBackendProperties,
178 StringSerializer stringSerializer,
179 JsonSerializer<AbstractMessageTo> messageSerializer)
181 Map<String, Object> properties = new HashMap<>();
182 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
184 ProducerConfig.CLIENT_ID_CONFIG,
185 chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
186 return new KafkaProducer<>(
193 StringSerializer stringSerializer()
195 return new StringSerializer();
199 JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
201 JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
202 serializer.configure(
204 JsonSerializer.TYPE_MAPPINGS, typeMappings),
210 Consumer<String, AbstractMessageTo> infoChannelConsumer(
211 Properties defaultConsumerProperties,
212 ChatBackendProperties chatBackendProperties,
213 StringDeserializer stringDeserializer,
214 JsonDeserializer<AbstractMessageTo> messageDeserializer)
216 Map<String, Object> properties = new HashMap<>();
217 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
219 ConsumerConfig.CLIENT_ID_CONFIG,
220 chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
222 ConsumerConfig.GROUP_ID_CONFIG,
224 return new KafkaConsumer<>(
227 messageDeserializer);
231 Consumer<String, AbstractMessageTo> dataChannelConsumer(
232 Properties defaultConsumerProperties,
233 ChatBackendProperties chatBackendProperties,
234 StringDeserializer stringDeserializer,
235 JsonDeserializer<AbstractMessageTo> messageDeserializer)
237 Map<String, Object> properties = new HashMap<>();
238 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
240 ConsumerConfig.CLIENT_ID_CONFIG,
241 chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
243 ConsumerConfig.GROUP_ID_CONFIG,
245 return new KafkaConsumer<>(
248 messageDeserializer);
252 StringDeserializer stringDeserializer()
254 return new StringDeserializer();
258 JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
260 JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
261 deserializer.configure(
263 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
264 JsonDeserializer.TYPE_MAPPINGS, typeMappings),
270 String typeMappings ()
273 "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," +
274 "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
278 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
280 Properties properties = new Properties();
281 properties.setProperty(
282 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
283 chatBackendProperties.getKafka().getBootstrapServers());
288 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
290 Properties properties = new Properties();
291 properties.setProperty(
292 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
293 chatBackendProperties.getKafka().getBootstrapServers());
294 properties.setProperty(
295 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
297 properties.setProperty(
298 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
304 ShardingPublisherStrategy shardingPublisherStrategy(
305 ChatBackendProperties properties)
307 String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
308 InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
309 return new HaproxyShardingPublisherStrategy(
311 properties.getKafka().getHaproxyMap(),
312 properties.getInstanceId());
318 return ZoneId.systemDefault();