1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
22 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
24 import java.time.Clock;
25 import java.time.ZoneId;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.Properties;
32 @ConditionalOnProperty(
33 prefix = "chat.backend",
35 havingValue = "kafka")
37 public class KafkaServicesConfiguration
40 ConsumerTaskRunner consumerTaskRunner(
41 ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
42 ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
44 return new ConsumerTaskRunner(
45 infoChannelConsumerTaskExecutor,
46 dataChannelConsumerTaskExecutor);
50 ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
51 ThreadPoolTaskExecutor taskExecutor,
52 InfoChannel infoChannel,
53 Consumer<String, AbstractMessageTo> infoChannelConsumer,
54 WorkAssignor infoChannelWorkAssignor)
56 return new ConsumerTaskExecutor(
60 infoChannelWorkAssignor);
64 WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
68 String topic = properties.getKafka().getInfoChannelTopic();
69 List<TopicPartition> partitions = consumer
73 new TopicPartition(topic, partitionInfo.partition()))
75 consumer.assign(partitions);
80 ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
81 ThreadPoolTaskExecutor taskExecutor,
82 DataChannel dataChannel,
83 Consumer<String, AbstractMessageTo> dataChannelConsumer,
84 WorkAssignor dataChannelWorkAssignor)
86 return new ConsumerTaskExecutor(
90 dataChannelWorkAssignor);
94 WorkAssignor dataChannelWorkAssignor(
95 ChatBackendProperties properties,
96 DataChannel dataChannel)
100 List<String> topics =
101 List.of(properties.getKafka().getDataChannelTopic());
102 consumer.subscribe(topics, dataChannel);
107 ChatHomeService kafkaChatHome(
108 ChatBackendProperties properties,
109 InfoChannel infoChannel,
110 DataChannel dataChannel)
112 return new KafkaChatHomeService(
113 properties.getKafka().getNumPartitions(),
119 InfoChannel infoChannel(
120 ChatBackendProperties properties,
121 Producer<String, AbstractMessageTo> producer,
122 Consumer<String, AbstractMessageTo> infoChannelConsumer)
124 return new InfoChannel(
125 properties.getKafka().getInfoChannelTopic(),
127 infoChannelConsumer);
131 DataChannel dataChannel(
132 ChatBackendProperties properties,
133 Producer<String, AbstractMessageTo> producer,
134 Consumer<String, AbstractMessageTo> dataChannelConsumer,
137 InfoChannel infoChannel)
139 return new DataChannel(
140 properties.getKafka().getDataChannelTopic(),
144 properties.getKafka().getNumPartitions(),
145 properties.getChatroomBufferSize(),
151 Producer<String, AbstractMessageTo> producer(
152 Properties defaultProducerProperties,
153 ChatBackendProperties chatBackendProperties,
154 StringSerializer stringSerializer,
155 JsonSerializer<AbstractMessageTo> messageSerializer)
157 Map<String, Object> properties = new HashMap<>();
158 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
160 ProducerConfig.CLIENT_ID_CONFIG,
161 chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
162 return new KafkaProducer<>(
169 StringSerializer stringSerializer()
171 return new StringSerializer();
175 JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
177 JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
178 serializer.configure(
180 JsonSerializer.TYPE_MAPPINGS, typeMappings),
186 Consumer<String, AbstractMessageTo> infoChannelConsumer(
187 Properties defaultConsumerProperties,
188 ChatBackendProperties chatBackendProperties,
189 StringDeserializer stringDeserializer,
190 JsonDeserializer<AbstractMessageTo> messageDeserializer)
192 Map<String, Object> properties = new HashMap<>();
193 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
195 ConsumerConfig.CLIENT_ID_CONFIG,
196 chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
198 ConsumerConfig.GROUP_ID_CONFIG,
200 return new KafkaConsumer<>(
203 messageDeserializer);
207 Consumer<String, AbstractMessageTo> dataChannelConsumer(
208 Properties defaultConsumerProperties,
209 ChatBackendProperties chatBackendProperties,
210 StringDeserializer stringDeserializer,
211 JsonDeserializer<AbstractMessageTo> messageDeserializer)
213 Map<String, Object> properties = new HashMap<>();
214 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
216 ConsumerConfig.CLIENT_ID_CONFIG,
217 chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
219 ConsumerConfig.GROUP_ID_CONFIG,
221 return new KafkaConsumer<>(
224 messageDeserializer);
228 StringDeserializer stringDeserializer()
230 return new StringDeserializer();
234 JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
236 JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
237 deserializer.configure(
239 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
240 JsonDeserializer.TYPE_MAPPINGS, typeMappings),
246 String typeMappings ()
249 "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," +
250 "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
254 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
256 Properties properties = new Properties();
257 properties.setProperty(
258 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
259 chatBackendProperties.getKafka().getBootstrapServers());
264 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
266 Properties properties = new Properties();
267 properties.setProperty(
268 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
269 chatBackendProperties.getKafka().getBootstrapServers());
270 properties.setProperty(
271 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
273 properties.setProperty(
274 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
282 return ZoneId.systemDefault();