1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
6 import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
9 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
10 import org.apache.kafka.clients.consumer.Consumer;
11 import org.apache.kafka.clients.consumer.ConsumerConfig;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.producer.KafkaProducer;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.apache.kafka.clients.producer.ProducerConfig;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 import org.apache.kafka.common.serialization.StringSerializer;
19 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
20 import org.springframework.context.annotation.Bean;
21 import org.springframework.context.annotation.Configuration;
22 import org.springframework.kafka.support.serializer.JsonDeserializer;
23 import org.springframework.kafka.support.serializer.JsonSerializer;
24 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
26 import java.net.InetSocketAddress;
27 import java.time.Clock;
28 import java.time.ZoneId;
29 import java.util.HashMap;
30 import java.util.List;
32 import java.util.Properties;
35 @ConditionalOnProperty(
36 prefix = "chat.backend",
38 havingValue = "kafka")
40 public class KafkaServicesConfiguration
43 ConsumerTaskRunner consumerTaskRunner(
44 ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
45 ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
46 InfoChannel infoChannel)
48 return new ConsumerTaskRunner(
49 infoChannelConsumerTaskExecutor,
50 dataChannelConsumerTaskExecutor,
55 ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
56 ThreadPoolTaskExecutor taskExecutor,
57 InfoChannel infoChannel,
58 Consumer<String, AbstractMessageTo> infoChannelConsumer,
59 WorkAssignor infoChannelWorkAssignor)
61 return new ConsumerTaskExecutor(
65 infoChannelWorkAssignor);
69 WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
73 String topic = properties.getKafka().getInfoChannelTopic();
74 List<TopicPartition> partitions = consumer
78 new TopicPartition(topic, partitionInfo.partition()))
80 consumer.assign(partitions);
85 ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
86 ThreadPoolTaskExecutor taskExecutor,
87 DataChannel dataChannel,
88 Consumer<String, AbstractMessageTo> dataChannelConsumer,
89 WorkAssignor dataChannelWorkAssignor)
91 return new ConsumerTaskExecutor(
95 dataChannelWorkAssignor);
99 WorkAssignor dataChannelWorkAssignor(
100 ChatBackendProperties properties,
101 DataChannel dataChannel)
105 List<String> topics =
106 List.of(properties.getKafka().getDataChannelTopic());
107 consumer.subscribe(topics, dataChannel);
112 ChatHomeService kafkaChatHome(
113 ChatBackendProperties properties,
114 InfoChannel infoChannel,
115 DataChannel dataChannel)
117 return new KafkaChatHomeService(
118 properties.getKafka().getNumPartitions(),
124 InfoChannel infoChannel(
125 ChatBackendProperties properties,
126 Producer<String, AbstractMessageTo> producer,
127 Consumer<String, AbstractMessageTo> infoChannelConsumer)
129 return new InfoChannel(
130 properties.getKafka().getInfoChannelTopic(),
133 properties.getKafka().getInstanceUri());
137 DataChannel dataChannel(
138 ChatBackendProperties properties,
139 Producer<String, AbstractMessageTo> producer,
140 Consumer<String, AbstractMessageTo> dataChannelConsumer,
143 InfoChannel infoChannel,
144 ShardingPublisherStrategy shardingPublisherStrategy)
146 return new DataChannel(
147 properties.getInstanceId(),
148 properties.getKafka().getDataChannelTopic(),
152 properties.getKafka().getNumPartitions(),
153 properties.getChatroomBufferSize(),
156 shardingPublisherStrategy);
160 Producer<String, AbstractMessageTo> producer(
161 Properties defaultProducerProperties,
162 ChatBackendProperties chatBackendProperties,
163 StringSerializer stringSerializer,
164 JsonSerializer<AbstractMessageTo> messageSerializer)
166 Map<String, Object> properties = new HashMap<>();
167 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
169 ProducerConfig.CLIENT_ID_CONFIG,
170 chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
171 return new KafkaProducer<>(
178 StringSerializer stringSerializer()
180 return new StringSerializer();
184 JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
186 JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
187 serializer.configure(
189 JsonSerializer.TYPE_MAPPINGS, typeMappings),
195 Consumer<String, AbstractMessageTo> infoChannelConsumer(
196 Properties defaultConsumerProperties,
197 ChatBackendProperties chatBackendProperties,
198 StringDeserializer stringDeserializer,
199 JsonDeserializer<AbstractMessageTo> messageDeserializer)
201 Map<String, Object> properties = new HashMap<>();
202 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
204 ConsumerConfig.CLIENT_ID_CONFIG,
205 chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
207 ConsumerConfig.GROUP_ID_CONFIG,
209 return new KafkaConsumer<>(
212 messageDeserializer);
216 Consumer<String, AbstractMessageTo> dataChannelConsumer(
217 Properties defaultConsumerProperties,
218 ChatBackendProperties chatBackendProperties,
219 StringDeserializer stringDeserializer,
220 JsonDeserializer<AbstractMessageTo> messageDeserializer)
222 Map<String, Object> properties = new HashMap<>();
223 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
225 ConsumerConfig.CLIENT_ID_CONFIG,
226 chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
228 ConsumerConfig.GROUP_ID_CONFIG,
230 return new KafkaConsumer<>(
233 messageDeserializer);
237 StringDeserializer stringDeserializer()
239 return new StringDeserializer();
243 JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
245 JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
246 deserializer.configure(
248 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
249 JsonDeserializer.TYPE_MAPPINGS, typeMappings),
255 String typeMappings ()
258 "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," +
259 "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
263 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
265 Properties properties = new Properties();
266 properties.setProperty(
267 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
268 chatBackendProperties.getKafka().getBootstrapServers());
273 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
275 Properties properties = new Properties();
276 properties.setProperty(
277 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
278 chatBackendProperties.getKafka().getBootstrapServers());
279 properties.setProperty(
280 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
282 properties.setProperty(
283 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
289 ShardingPublisherStrategy shardingPublisherStrategy(
290 ChatBackendProperties properties)
292 String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
293 InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
294 return new HaproxyShardingPublisherStrategy(
296 properties.getKafka().getHaproxyMap(),
297 properties.getInstanceId());
303 return ZoneId.systemDefault();