1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
6 import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
9 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
10 import org.apache.kafka.clients.consumer.Consumer;
11 import org.apache.kafka.clients.consumer.ConsumerConfig;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.producer.KafkaProducer;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.apache.kafka.clients.producer.ProducerConfig;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 import org.apache.kafka.common.serialization.StringSerializer;
19 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
20 import org.springframework.context.annotation.Bean;
21 import org.springframework.context.annotation.Configuration;
22 import org.springframework.kafka.support.serializer.JsonDeserializer;
23 import org.springframework.kafka.support.serializer.JsonSerializer;
24 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
26 import java.net.InetSocketAddress;
27 import java.time.Clock;
28 import java.time.ZoneId;
29 import java.util.HashMap;
30 import java.util.List;
32 import java.util.Properties;
35 @ConditionalOnProperty(
36 prefix = "chat.backend",
38 havingValue = "kafka")
40 public class KafkaServicesConfiguration
43 ConsumerTaskRunner consumerTaskRunner(
44 ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
45 ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
46 InfoChannel infoChannel)
48 return new ConsumerTaskRunner(
49 infoChannelConsumerTaskExecutor,
50 dataChannelConsumerTaskExecutor,
55 ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
56 ThreadPoolTaskExecutor taskExecutor,
57 InfoChannel infoChannel,
58 Consumer<String, AbstractMessageTo> infoChannelConsumer,
59 WorkAssignor infoChannelWorkAssignor)
61 return new ConsumerTaskExecutor(
65 infoChannelWorkAssignor);
69 WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
73 String topic = properties.getKafka().getInfoChannelTopic();
74 List<TopicPartition> partitions = consumer
78 new TopicPartition(topic, partitionInfo.partition()))
80 consumer.assign(partitions);
85 ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
86 ThreadPoolTaskExecutor taskExecutor,
87 DataChannel dataChannel,
88 Consumer<String, AbstractMessageTo> dataChannelConsumer,
89 WorkAssignor dataChannelWorkAssignor)
91 return new ConsumerTaskExecutor(
95 dataChannelWorkAssignor);
99 WorkAssignor dataChannelWorkAssignor(
100 ChatBackendProperties properties,
101 DataChannel dataChannel)
105 List<String> topics =
106 List.of(properties.getKafka().getDataChannelTopic());
107 consumer.subscribe(topics, dataChannel);
112 KafkaChatHomeService kafkaChatHome(
113 ChatBackendProperties properties,
114 InfoChannel infoChannel,
115 DataChannel dataChannel)
117 return new KafkaChatHomeService(
118 properties.getKafka().getNumPartitions(),
124 InfoChannel infoChannel(
125 ChatBackendProperties properties,
126 Producer<String, AbstractMessageTo> producer,
127 Consumer<String, AbstractMessageTo> infoChannelConsumer)
129 return new InfoChannel(
130 properties.getKafka().getInfoChannelTopic(),
133 properties.getKafka().getNumPartitions(),
134 properties.getKafka().getInstanceUri());
138 DataChannel dataChannel(
139 ChatBackendProperties properties,
140 Producer<String, AbstractMessageTo> producer,
141 Consumer<String, AbstractMessageTo> dataChannelConsumer,
144 InfoChannel infoChannel,
145 ShardingPublisherStrategy shardingPublisherStrategy)
147 return new DataChannel(
148 properties.getInstanceId(),
149 properties.getKafka().getDataChannelTopic(),
153 properties.getKafka().getNumPartitions(),
154 properties.getChatroomBufferSize(),
157 shardingPublisherStrategy);
161 Producer<String, AbstractMessageTo> producer(
162 Properties defaultProducerProperties,
163 ChatBackendProperties chatBackendProperties,
164 StringSerializer stringSerializer,
165 JsonSerializer<AbstractMessageTo> messageSerializer)
167 Map<String, Object> properties = new HashMap<>();
168 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
170 ProducerConfig.CLIENT_ID_CONFIG,
171 chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
172 return new KafkaProducer<>(
179 StringSerializer stringSerializer()
181 return new StringSerializer();
185 JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
187 JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
188 serializer.configure(
190 JsonSerializer.TYPE_MAPPINGS, typeMappings),
196 Consumer<String, AbstractMessageTo> infoChannelConsumer(
197 Properties defaultConsumerProperties,
198 ChatBackendProperties chatBackendProperties,
199 StringDeserializer stringDeserializer,
200 JsonDeserializer<AbstractMessageTo> messageDeserializer)
202 Map<String, Object> properties = new HashMap<>();
203 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
205 ConsumerConfig.CLIENT_ID_CONFIG,
206 chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
208 ConsumerConfig.GROUP_ID_CONFIG,
210 return new KafkaConsumer<>(
213 messageDeserializer);
217 Consumer<String, AbstractMessageTo> dataChannelConsumer(
218 Properties defaultConsumerProperties,
219 ChatBackendProperties chatBackendProperties,
220 StringDeserializer stringDeserializer,
221 JsonDeserializer<AbstractMessageTo> messageDeserializer)
223 Map<String, Object> properties = new HashMap<>();
224 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
226 ConsumerConfig.CLIENT_ID_CONFIG,
227 chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
229 ConsumerConfig.GROUP_ID_CONFIG,
231 return new KafkaConsumer<>(
234 messageDeserializer);
238 StringDeserializer stringDeserializer()
240 return new StringDeserializer();
244 JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
246 JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
247 deserializer.configure(
249 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
250 JsonDeserializer.TYPE_MAPPINGS, typeMappings),
256 String typeMappings ()
259 "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," +
260 "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
264 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
266 Properties properties = new Properties();
267 properties.setProperty(
268 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
269 chatBackendProperties.getKafka().getBootstrapServers());
274 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
276 Properties properties = new Properties();
277 properties.setProperty(
278 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
279 chatBackendProperties.getKafka().getBootstrapServers());
280 properties.setProperty(
281 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
283 properties.setProperty(
284 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
290 ShardingPublisherStrategy shardingPublisherStrategy(
291 ChatBackendProperties properties)
293 String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
294 InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
295 return new HaproxyShardingPublisherStrategy(
297 properties.getKafka().getHaproxyMap(),
298 properties.getInstanceId());
304 return ZoneId.systemDefault();