1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
6 import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
9 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
10 import org.apache.kafka.clients.consumer.Consumer;
11 import org.apache.kafka.clients.consumer.ConsumerConfig;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.producer.KafkaProducer;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.apache.kafka.clients.producer.ProducerConfig;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 import org.apache.kafka.common.serialization.StringSerializer;
19 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
20 import org.springframework.context.annotation.Bean;
21 import org.springframework.context.annotation.Configuration;
22 import org.springframework.kafka.support.serializer.JsonDeserializer;
23 import org.springframework.kafka.support.serializer.JsonSerializer;
24 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
26 import java.net.InetSocketAddress;
27 import java.time.Clock;
28 import java.time.ZoneId;
29 import java.util.HashMap;
30 import java.util.List;
32 import java.util.Properties;
35 @ConditionalOnProperty(
36 prefix = "chat.backend",
38 havingValue = "kafka")
40 public class KafkaServicesConfiguration
43 ConsumerTaskRunner consumerTaskRunner(
44 ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
45 ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
46 InfoChannel infoChannel)
48 return new ConsumerTaskRunner(
49 infoChannelConsumerTaskExecutor,
50 dataChannelConsumerTaskExecutor,
55 ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
56 ThreadPoolTaskExecutor taskExecutor,
57 InfoChannel infoChannel,
58 Consumer<String, AbstractMessageTo> infoChannelConsumer,
59 WorkAssignor infoChannelWorkAssignor)
61 return new ConsumerTaskExecutor(
65 infoChannelWorkAssignor);
69 WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
73 String topic = properties.getKafka().getInfoChannelTopic();
74 List<TopicPartition> partitions = consumer
78 new TopicPartition(topic, partitionInfo.partition()))
80 consumer.assign(partitions);
85 ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
86 ThreadPoolTaskExecutor taskExecutor,
87 DataChannel dataChannel,
88 Consumer<String, AbstractMessageTo> dataChannelConsumer,
89 WorkAssignor dataChannelWorkAssignor)
91 return new ConsumerTaskExecutor(
95 dataChannelWorkAssignor);
99 WorkAssignor dataChannelWorkAssignor(
100 ChatBackendProperties properties,
101 DataChannel dataChannel)
105 List<String> topics =
106 List.of(properties.getKafka().getDataChannelTopic());
107 consumer.subscribe(topics, dataChannel);
112 KafkaChatHomeService kafkaChatHome(
113 ChatBackendProperties properties,
114 InfoChannel infoChannel,
115 DataChannel dataChannel)
117 return new KafkaChatHomeService(
118 properties.getKafka().getNumPartitions(),
124 InfoChannel infoChannel(
125 ChatBackendProperties properties,
126 Producer<String, AbstractMessageTo> producer,
127 Consumer<String, AbstractMessageTo> infoChannelConsumer)
129 return new InfoChannel(
130 properties.getKafka().getInfoChannelTopic(),
133 properties.getKafka().getPollingInterval(),
134 properties.getKafka().getNumPartitions(),
135 properties.getKafka().getInstanceUri());
139 DataChannel dataChannel(
140 ChatBackendProperties properties,
141 Producer<String, AbstractMessageTo> producer,
142 Consumer<String, AbstractMessageTo> dataChannelConsumer,
145 InfoChannel infoChannel,
146 ShardingPublisherStrategy shardingPublisherStrategy)
148 return new DataChannel(
149 properties.getInstanceId(),
150 properties.getKafka().getDataChannelTopic(),
154 properties.getKafka().getNumPartitions(),
155 properties.getKafka().getPollingInterval(),
156 properties.getChatroomBufferSize(),
159 shardingPublisherStrategy);
163 Producer<String, AbstractMessageTo> producer(
164 Properties defaultProducerProperties,
165 ChatBackendProperties chatBackendProperties,
166 StringSerializer stringSerializer,
167 JsonSerializer<AbstractMessageTo> messageSerializer)
169 Map<String, Object> properties = new HashMap<>();
170 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
172 ProducerConfig.CLIENT_ID_CONFIG,
173 chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
174 return new KafkaProducer<>(
181 StringSerializer stringSerializer()
183 return new StringSerializer();
187 JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
189 JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
190 serializer.configure(
192 JsonSerializer.TYPE_MAPPINGS, typeMappings),
198 Consumer<String, AbstractMessageTo> infoChannelConsumer(
199 Properties defaultConsumerProperties,
200 ChatBackendProperties chatBackendProperties,
201 StringDeserializer stringDeserializer,
202 JsonDeserializer<AbstractMessageTo> messageDeserializer)
204 Map<String, Object> properties = new HashMap<>();
205 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
207 ConsumerConfig.CLIENT_ID_CONFIG,
208 chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
210 ConsumerConfig.GROUP_ID_CONFIG,
212 return new KafkaConsumer<>(
215 messageDeserializer);
219 Consumer<String, AbstractMessageTo> dataChannelConsumer(
220 Properties defaultConsumerProperties,
221 ChatBackendProperties chatBackendProperties,
222 StringDeserializer stringDeserializer,
223 JsonDeserializer<AbstractMessageTo> messageDeserializer)
225 Map<String, Object> properties = new HashMap<>();
226 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
228 ConsumerConfig.CLIENT_ID_CONFIG,
229 chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
231 ConsumerConfig.GROUP_ID_CONFIG,
233 return new KafkaConsumer<>(
236 messageDeserializer);
240 StringDeserializer stringDeserializer()
242 return new StringDeserializer();
246 JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
248 JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
249 deserializer.configure(
251 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
252 JsonDeserializer.TYPE_MAPPINGS, typeMappings),
258 String typeMappings ()
261 "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," +
262 "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
266 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
268 Properties properties = new Properties();
269 properties.setProperty(
270 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
271 chatBackendProperties.getKafka().getBootstrapServers());
276 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
278 Properties properties = new Properties();
279 properties.setProperty(
280 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
281 chatBackendProperties.getKafka().getBootstrapServers());
282 properties.setProperty(
283 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
285 properties.setProperty(
286 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
292 ShardingPublisherStrategy shardingPublisherStrategy(
293 ChatBackendProperties properties)
295 String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
296 InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
297 return new HaproxyShardingPublisherStrategy(
299 properties.getKafka().getHaproxyMap(),
300 properties.getInstanceId());
306 return ZoneId.systemDefault();