1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
22 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
24 import java.time.Clock;
25 import java.time.ZoneId;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.Properties;
32 @ConditionalOnProperty(
33 prefix = "chat.backend",
35 havingValue = "kafka")
37 public class KafkaServicesConfiguration
40 ConsumerTaskRunner consumerTaskRunner(
41 ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
42 ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
44 return new ConsumerTaskRunner(
45 infoChannelConsumerTaskExecutor,
46 dataChannelConsumerTaskExecutor);
50 ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
51 ThreadPoolTaskExecutor taskExecutor,
52 InfoChannel infoChannel,
53 Consumer<String, AbstractMessageTo> infoChannelConsumer,
54 ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor)
56 return new ConsumerTaskExecutor(
60 infoChannelWorkAssignor);
64 ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor(
65 ChatBackendProperties properties)
69 String topic = properties.getKafka().getInfoChannelTopic();
70 List<TopicPartition> partitions = consumer
74 new TopicPartition(topic, partitionInfo.partition()))
76 consumer.assign(partitions);
81 ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
82 ThreadPoolTaskExecutor taskExecutor,
83 DataChannel dataChannel,
84 Consumer<String, AbstractMessageTo> dataChannelConsumer,
85 ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor)
87 return new ConsumerTaskExecutor(
91 dataChannelWorkAssignor);
95 ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor(
96 ChatBackendProperties properties,
97 DataChannel dataChannel)
101 List<String> topics =
102 List.of(properties.getKafka().getDataChannelTopic());
103 consumer.subscribe(topics, dataChannel);
108 ChatHomeService kafkaChatHome(
109 ChatBackendProperties properties,
110 InfoChannel infoChannel,
111 DataChannel dataChannel)
113 return new KafkaChatHomeService(
114 properties.getKafka().getNumPartitions(),
120 InfoChannel infoChannel(
121 ChatBackendProperties properties,
122 Producer<String, AbstractMessageTo> producer,
123 Consumer<String, AbstractMessageTo> infoChannelConsumer)
125 return new InfoChannel(
126 properties.getKafka().getInfoChannelTopic(),
128 infoChannelConsumer);
132 DataChannel dataChannel(
133 ChatBackendProperties properties,
134 Producer<String, AbstractMessageTo> producer,
135 Consumer<String, AbstractMessageTo> dataChannelConsumer,
139 return new DataChannel(
140 properties.getKafka().getDataChannelTopic(),
144 properties.getKafka().getNumPartitions(),
145 properties.getChatroomBufferSize(),
150 Producer<String, AbstractMessageTo> producer(
151 Properties defaultProducerProperties,
152 ChatBackendProperties chatBackendProperties,
153 StringSerializer stringSerializer,
154 JsonSerializer<AbstractMessageTo> messageSerializer)
156 Map<String, Object> properties = new HashMap<>();
157 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
159 ProducerConfig.CLIENT_ID_CONFIG,
160 chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
161 return new KafkaProducer<>(
168 StringSerializer stringSerializer()
170 return new StringSerializer();
174 JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
176 JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
177 serializer.configure(
179 JsonSerializer.TYPE_MAPPINGS, typeMappings),
185 Consumer<String, AbstractMessageTo> infoChannelConsumer(
186 Properties defaultConsumerProperties,
187 ChatBackendProperties chatBackendProperties,
188 StringDeserializer stringDeserializer,
189 JsonDeserializer<AbstractMessageTo> messageDeserializer)
191 Map<String, Object> properties = new HashMap<>();
192 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
194 ConsumerConfig.CLIENT_ID_CONFIG,
195 chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
197 ConsumerConfig.GROUP_ID_CONFIG,
199 return new KafkaConsumer<>(
202 messageDeserializer);
206 Consumer<String, AbstractMessageTo> dataChannelConsumer(
207 Properties defaultConsumerProperties,
208 ChatBackendProperties chatBackendProperties,
209 StringDeserializer stringDeserializer,
210 JsonDeserializer<AbstractMessageTo> messageDeserializer)
212 Map<String, Object> properties = new HashMap<>();
213 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
215 ConsumerConfig.CLIENT_ID_CONFIG,
216 chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
218 ConsumerConfig.GROUP_ID_CONFIG,
220 return new KafkaConsumer<>(
223 messageDeserializer);
227 StringDeserializer stringDeserializer()
229 return new StringDeserializer();
233 JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
235 JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
236 deserializer.configure(
238 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
239 JsonDeserializer.TYPE_MAPPINGS, typeMappings),
245 String typeMappings ()
248 "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," +
249 "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
253 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
255 Properties properties = new Properties();
256 properties.setProperty(
257 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
258 chatBackendProperties.getKafka().getBootstrapServers());
263 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
265 Properties properties = new Properties();
266 properties.setProperty(
267 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
268 chatBackendProperties.getKafka().getBootstrapServers());
269 properties.setProperty(
270 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
272 properties.setProperty(
273 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
281 return ZoneId.systemDefault();