1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
22 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
24 import java.time.Clock;
25 import java.time.ZoneId;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.Properties;
32 @ConditionalOnProperty(
33 prefix = "chat.backend",
35 havingValue = "kafka")
37 public class KafkaServicesConfiguration
40 ConsumerTaskRunner consumerTaskRunner(
41 ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
42 ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
43 InfoChannel infoChannel)
45 return new ConsumerTaskRunner(
46 infoChannelConsumerTaskExecutor,
47 dataChannelConsumerTaskExecutor,
52 ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
53 ThreadPoolTaskExecutor taskExecutor,
54 InfoChannel infoChannel,
55 Consumer<String, AbstractMessageTo> infoChannelConsumer,
56 WorkAssignor infoChannelWorkAssignor)
58 return new ConsumerTaskExecutor(
62 infoChannelWorkAssignor);
66 WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
70 String topic = properties.getKafka().getInfoChannelTopic();
71 List<TopicPartition> partitions = consumer
75 new TopicPartition(topic, partitionInfo.partition()))
77 consumer.assign(partitions);
82 ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
83 ThreadPoolTaskExecutor taskExecutor,
84 DataChannel dataChannel,
85 Consumer<String, AbstractMessageTo> dataChannelConsumer,
86 WorkAssignor dataChannelWorkAssignor)
88 return new ConsumerTaskExecutor(
92 dataChannelWorkAssignor);
96 WorkAssignor dataChannelWorkAssignor(
97 ChatBackendProperties properties,
98 DataChannel dataChannel)
102 List<String> topics =
103 List.of(properties.getKafka().getDataChannelTopic());
104 consumer.subscribe(topics, dataChannel);
109 ChatHomeService kafkaChatHome(
110 ChatBackendProperties properties,
111 InfoChannel infoChannel,
112 DataChannel dataChannel)
114 return new KafkaChatHomeService(
115 properties.getKafka().getNumPartitions(),
121 InfoChannel infoChannel(
122 ChatBackendProperties properties,
123 Producer<String, AbstractMessageTo> producer,
124 Consumer<String, AbstractMessageTo> infoChannelConsumer)
126 return new InfoChannel(
127 properties.getKafka().getInfoChannelTopic(),
130 properties.getKafka().getInstanceUri());
134 DataChannel dataChannel(
135 ChatBackendProperties properties,
136 Producer<String, AbstractMessageTo> producer,
137 Consumer<String, AbstractMessageTo> dataChannelConsumer,
140 InfoChannel infoChannel)
142 return new DataChannel(
143 properties.getKafka().getDataChannelTopic(),
147 properties.getKafka().getNumPartitions(),
148 properties.getChatroomBufferSize(),
154 Producer<String, AbstractMessageTo> producer(
155 Properties defaultProducerProperties,
156 ChatBackendProperties chatBackendProperties,
157 StringSerializer stringSerializer,
158 JsonSerializer<AbstractMessageTo> messageSerializer)
160 Map<String, Object> properties = new HashMap<>();
161 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
163 ProducerConfig.CLIENT_ID_CONFIG,
164 chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
165 return new KafkaProducer<>(
172 StringSerializer stringSerializer()
174 return new StringSerializer();
178 JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
180 JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
181 serializer.configure(
183 JsonSerializer.TYPE_MAPPINGS, typeMappings),
189 Consumer<String, AbstractMessageTo> infoChannelConsumer(
190 Properties defaultConsumerProperties,
191 ChatBackendProperties chatBackendProperties,
192 StringDeserializer stringDeserializer,
193 JsonDeserializer<AbstractMessageTo> messageDeserializer)
195 Map<String, Object> properties = new HashMap<>();
196 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
198 ConsumerConfig.CLIENT_ID_CONFIG,
199 chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
201 ConsumerConfig.GROUP_ID_CONFIG,
203 return new KafkaConsumer<>(
206 messageDeserializer);
210 Consumer<String, AbstractMessageTo> dataChannelConsumer(
211 Properties defaultConsumerProperties,
212 ChatBackendProperties chatBackendProperties,
213 StringDeserializer stringDeserializer,
214 JsonDeserializer<AbstractMessageTo> messageDeserializer)
216 Map<String, Object> properties = new HashMap<>();
217 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
219 ConsumerConfig.CLIENT_ID_CONFIG,
220 chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
222 ConsumerConfig.GROUP_ID_CONFIG,
224 return new KafkaConsumer<>(
227 messageDeserializer);
231 StringDeserializer stringDeserializer()
233 return new StringDeserializer();
237 JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
239 JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
240 deserializer.configure(
242 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
243 JsonDeserializer.TYPE_MAPPINGS, typeMappings),
249 String typeMappings ()
252 "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," +
253 "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
257 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
259 Properties properties = new Properties();
260 properties.setProperty(
261 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
262 chatBackendProperties.getKafka().getBootstrapServers());
267 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
269 Properties properties = new Properties();
270 properties.setProperty(
271 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
272 chatBackendProperties.getKafka().getBootstrapServers());
273 properties.setProperty(
274 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
276 properties.setProperty(
277 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
285 return ZoneId.systemDefault();