1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerConfig;
11 import org.apache.kafka.clients.consumer.KafkaConsumer;
12 import org.apache.kafka.clients.producer.KafkaProducer;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerConfig;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.serialization.StringDeserializer;
17 import org.apache.kafka.common.serialization.StringSerializer;
18 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
19 import org.springframework.context.annotation.Bean;
20 import org.springframework.context.annotation.Configuration;
21 import org.springframework.kafka.support.serializer.JsonDeserializer;
22 import org.springframework.kafka.support.serializer.JsonSerializer;
23 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
24 import reactor.core.publisher.Mono;
26 import java.time.Clock;
27 import java.time.ZoneId;
28 import java.util.HashMap;
29 import java.util.List;
31 import java.util.Properties;
34 @ConditionalOnProperty(
35 prefix = "chat.backend",
37 havingValue = "kafka")
39 public class KafkaServicesConfiguration
42 ConsumerTaskRunner consumerTaskRunner(
43 ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
44 ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
45 InfoChannel infoChannel)
47 return new ConsumerTaskRunner(
48 infoChannelConsumerTaskExecutor,
49 dataChannelConsumerTaskExecutor,
54 ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
55 ThreadPoolTaskExecutor taskExecutor,
56 InfoChannel infoChannel,
57 Consumer<String, AbstractMessageTo> infoChannelConsumer,
58 WorkAssignor infoChannelWorkAssignor)
60 return new ConsumerTaskExecutor(
64 infoChannelWorkAssignor);
68 WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
72 String topic = properties.getKafka().getInfoChannelTopic();
73 List<TopicPartition> partitions = consumer
77 new TopicPartition(topic, partitionInfo.partition()))
79 consumer.assign(partitions);
84 ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
85 ThreadPoolTaskExecutor taskExecutor,
86 DataChannel dataChannel,
87 Consumer<String, AbstractMessageTo> dataChannelConsumer,
88 WorkAssignor dataChannelWorkAssignor)
90 return new ConsumerTaskExecutor(
94 dataChannelWorkAssignor);
98 WorkAssignor dataChannelWorkAssignor(
99 ChatBackendProperties properties,
100 DataChannel dataChannel)
104 List<String> topics =
105 List.of(properties.getKafka().getDataChannelTopic());
106 consumer.subscribe(topics, dataChannel);
111 ChatHomeService kafkaChatHome(
112 ChatBackendProperties properties,
113 InfoChannel infoChannel,
114 DataChannel dataChannel)
116 return new KafkaChatHomeService(
117 properties.getKafka().getNumPartitions(),
123 InfoChannel infoChannel(
124 ChatBackendProperties properties,
125 Producer<String, AbstractMessageTo> producer,
126 Consumer<String, AbstractMessageTo> infoChannelConsumer)
128 return new InfoChannel(
129 properties.getKafka().getInfoChannelTopic(),
132 properties.getKafka().getInstanceUri());
136 DataChannel dataChannel(
137 ChatBackendProperties properties,
138 Producer<String, AbstractMessageTo> producer,
139 Consumer<String, AbstractMessageTo> dataChannelConsumer,
142 InfoChannel infoChannel,
143 ShardingPublisherStrategy shardingPublisherStrategy)
145 return new DataChannel(
146 properties.getInstanceId(),
147 properties.getKafka().getDataChannelTopic(),
151 properties.getKafka().getNumPartitions(),
152 properties.getChatroomBufferSize(),
155 shardingPublisherStrategy);
159 Producer<String, AbstractMessageTo> producer(
160 Properties defaultProducerProperties,
161 ChatBackendProperties chatBackendProperties,
162 StringSerializer stringSerializer,
163 JsonSerializer<AbstractMessageTo> messageSerializer)
165 Map<String, Object> properties = new HashMap<>();
166 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
168 ProducerConfig.CLIENT_ID_CONFIG,
169 chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
170 return new KafkaProducer<>(
177 StringSerializer stringSerializer()
179 return new StringSerializer();
183 JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
185 JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
186 serializer.configure(
188 JsonSerializer.TYPE_MAPPINGS, typeMappings),
194 Consumer<String, AbstractMessageTo> infoChannelConsumer(
195 Properties defaultConsumerProperties,
196 ChatBackendProperties chatBackendProperties,
197 StringDeserializer stringDeserializer,
198 JsonDeserializer<AbstractMessageTo> messageDeserializer)
200 Map<String, Object> properties = new HashMap<>();
201 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
203 ConsumerConfig.CLIENT_ID_CONFIG,
204 chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
206 ConsumerConfig.GROUP_ID_CONFIG,
208 return new KafkaConsumer<>(
211 messageDeserializer);
215 Consumer<String, AbstractMessageTo> dataChannelConsumer(
216 Properties defaultConsumerProperties,
217 ChatBackendProperties chatBackendProperties,
218 StringDeserializer stringDeserializer,
219 JsonDeserializer<AbstractMessageTo> messageDeserializer)
221 Map<String, Object> properties = new HashMap<>();
222 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
224 ConsumerConfig.CLIENT_ID_CONFIG,
225 chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
227 ConsumerConfig.GROUP_ID_CONFIG,
229 return new KafkaConsumer<>(
232 messageDeserializer);
236 StringDeserializer stringDeserializer()
238 return new StringDeserializer();
242 JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
244 JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
245 deserializer.configure(
247 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
248 JsonDeserializer.TYPE_MAPPINGS, typeMappings),
254 String typeMappings ()
257 "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," +
258 "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
262 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
264 Properties properties = new Properties();
265 properties.setProperty(
266 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
267 chatBackendProperties.getKafka().getBootstrapServers());
272 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
274 Properties properties = new Properties();
275 properties.setProperty(
276 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
277 chatBackendProperties.getKafka().getBootstrapServers());
278 properties.setProperty(
279 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
281 properties.setProperty(
282 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
288 ShardingPublisherStrategy shardingPublisherStrategy()
290 return new ShardingPublisherStrategy() {
292 public Mono<String> publishOwnership(int shard)
294 return Mono.just(Integer.toString(shard));
302 return ZoneId.systemDefault();