1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
22 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
24 import java.time.Clock;
25 import java.time.ZoneId;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.Properties;
32 @ConditionalOnProperty(
33 prefix = "chat.backend",
35 havingValue = "kafka")
37 public class KafkaServicesConfiguration
40 ConsumerTaskRunner consumerTaskRunner(
41 ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
42 ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
43 InfoChannel infoChannel)
45 return new ConsumerTaskRunner(
46 infoChannelConsumerTaskExecutor,
47 dataChannelConsumerTaskExecutor,
52 ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
53 ThreadPoolTaskExecutor taskExecutor,
54 InfoChannel infoChannel,
55 Consumer<String, AbstractMessageTo> infoChannelConsumer,
56 WorkAssignor infoChannelWorkAssignor)
58 return new ConsumerTaskExecutor(
62 infoChannelWorkAssignor);
66 WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
70 String topic = properties.getKafka().getInfoChannelTopic();
71 List<TopicPartition> partitions = consumer
75 new TopicPartition(topic, partitionInfo.partition()))
77 consumer.assign(partitions);
82 ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
83 ThreadPoolTaskExecutor taskExecutor,
84 DataChannel dataChannel,
85 Consumer<String, AbstractMessageTo> dataChannelConsumer,
86 WorkAssignor dataChannelWorkAssignor)
88 return new ConsumerTaskExecutor(
92 dataChannelWorkAssignor);
96 WorkAssignor dataChannelWorkAssignor(
97 ChatBackendProperties properties,
98 DataChannel dataChannel)
102 List<String> topics =
103 List.of(properties.getKafka().getDataChannelTopic());
104 consumer.subscribe(topics, dataChannel);
109 ChatHomeService kafkaChatHome(
110 ChatBackendProperties properties,
111 InfoChannel infoChannel,
112 DataChannel dataChannel)
114 return new KafkaChatHomeService(
115 properties.getKafka().getNumPartitions(),
121 InfoChannel infoChannel(
122 ChatBackendProperties properties,
123 Producer<String, AbstractMessageTo> producer,
124 Consumer<String, AbstractMessageTo> infoChannelConsumer)
126 return new InfoChannel(
127 properties.getKafka().getInfoChannelTopic(),
130 properties.getKafka().getInstanceUri());
134 DataChannel dataChannel(
135 ChatBackendProperties properties,
136 Producer<String, AbstractMessageTo> producer,
137 Consumer<String, AbstractMessageTo> dataChannelConsumer,
140 InfoChannel infoChannel)
142 return new DataChannel(
143 properties.getInstanceId(),
144 properties.getKafka().getDataChannelTopic(),
148 properties.getKafka().getNumPartitions(),
149 properties.getChatroomBufferSize(),
155 Producer<String, AbstractMessageTo> producer(
156 Properties defaultProducerProperties,
157 ChatBackendProperties chatBackendProperties,
158 StringSerializer stringSerializer,
159 JsonSerializer<AbstractMessageTo> messageSerializer)
161 Map<String, Object> properties = new HashMap<>();
162 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
164 ProducerConfig.CLIENT_ID_CONFIG,
165 chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
166 return new KafkaProducer<>(
173 StringSerializer stringSerializer()
175 return new StringSerializer();
179 JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
181 JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
182 serializer.configure(
184 JsonSerializer.TYPE_MAPPINGS, typeMappings),
190 Consumer<String, AbstractMessageTo> infoChannelConsumer(
191 Properties defaultConsumerProperties,
192 ChatBackendProperties chatBackendProperties,
193 StringDeserializer stringDeserializer,
194 JsonDeserializer<AbstractMessageTo> messageDeserializer)
196 Map<String, Object> properties = new HashMap<>();
197 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
199 ConsumerConfig.CLIENT_ID_CONFIG,
200 chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
202 ConsumerConfig.GROUP_ID_CONFIG,
204 return new KafkaConsumer<>(
207 messageDeserializer);
211 Consumer<String, AbstractMessageTo> dataChannelConsumer(
212 Properties defaultConsumerProperties,
213 ChatBackendProperties chatBackendProperties,
214 StringDeserializer stringDeserializer,
215 JsonDeserializer<AbstractMessageTo> messageDeserializer)
217 Map<String, Object> properties = new HashMap<>();
218 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
220 ConsumerConfig.CLIENT_ID_CONFIG,
221 chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
223 ConsumerConfig.GROUP_ID_CONFIG,
225 return new KafkaConsumer<>(
228 messageDeserializer);
232 StringDeserializer stringDeserializer()
234 return new StringDeserializer();
238 JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
240 JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
241 deserializer.configure(
243 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
244 JsonDeserializer.TYPE_MAPPINGS, typeMappings),
250 String typeMappings ()
253 "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," +
254 "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
258 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
260 Properties properties = new Properties();
261 properties.setProperty(
262 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
263 chatBackendProperties.getKafka().getBootstrapServers());
268 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
270 Properties properties = new Properties();
271 properties.setProperty(
272 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
273 chatBackendProperties.getKafka().getBootstrapServers());
274 properties.setProperty(
275 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
277 properties.setProperty(
278 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
286 return ZoneId.systemDefault();