1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
22 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
24 import java.time.Clock;
25 import java.time.ZoneId;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.Properties;
32 @ConditionalOnProperty(
33 prefix = "chat.backend",
35 havingValue = "kafka")
37 public class KafkaServicesConfiguration
40 ConsumerTaskRunner consumerTaskRunner(
41 ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
42 ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
44 return new ConsumerTaskRunner(
45 infoChannelConsumerTaskExecutor,
46 dataChannelConsumerTaskExecutor);
50 ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
51 ThreadPoolTaskExecutor taskExecutor,
52 InfoChannel infoChannel,
53 Consumer<String, AbstractMessageTo> infoChannelConsumer,
54 WorkAssignor infoChannelWorkAssignor)
56 return new ConsumerTaskExecutor(
60 infoChannelWorkAssignor);
64 WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
68 String topic = properties.getKafka().getInfoChannelTopic();
69 List<TopicPartition> partitions = consumer
73 new TopicPartition(topic, partitionInfo.partition()))
75 consumer.assign(partitions);
80 ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
81 ThreadPoolTaskExecutor taskExecutor,
82 DataChannel dataChannel,
83 Consumer<String, AbstractMessageTo> dataChannelConsumer,
84 WorkAssignor dataChannelWorkAssignor)
86 return new ConsumerTaskExecutor(
90 dataChannelWorkAssignor);
94 WorkAssignor dataChannelWorkAssignor(
95 ChatBackendProperties properties,
96 DataChannel dataChannel)
100 List<String> topics =
101 List.of(properties.getKafka().getDataChannelTopic());
102 consumer.subscribe(topics, dataChannel);
107 ChatHomeService kafkaChatHome(
108 ChatBackendProperties properties,
109 InfoChannel infoChannel,
110 DataChannel dataChannel)
112 return new KafkaChatHomeService(
113 properties.getKafka().getNumPartitions(),
119 InfoChannel infoChannel(
120 ChatBackendProperties properties,
121 Producer<String, AbstractMessageTo> producer,
122 Consumer<String, AbstractMessageTo> infoChannelConsumer)
124 return new InfoChannel(
125 properties.getKafka().getInfoChannelTopic(),
127 infoChannelConsumer);
131 DataChannel dataChannel(
132 ChatBackendProperties properties,
133 Producer<String, AbstractMessageTo> producer,
134 Consumer<String, AbstractMessageTo> dataChannelConsumer,
138 return new DataChannel(
139 properties.getKafka().getDataChannelTopic(),
143 properties.getKafka().getNumPartitions(),
144 properties.getChatroomBufferSize(),
149 Producer<String, AbstractMessageTo> producer(
150 Properties defaultProducerProperties,
151 ChatBackendProperties chatBackendProperties,
152 StringSerializer stringSerializer,
153 JsonSerializer<AbstractMessageTo> messageSerializer)
155 Map<String, Object> properties = new HashMap<>();
156 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
158 ProducerConfig.CLIENT_ID_CONFIG,
159 chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
160 return new KafkaProducer<>(
167 StringSerializer stringSerializer()
169 return new StringSerializer();
173 JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
175 JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
176 serializer.configure(
178 JsonSerializer.TYPE_MAPPINGS, typeMappings),
184 Consumer<String, AbstractMessageTo> infoChannelConsumer(
185 Properties defaultConsumerProperties,
186 ChatBackendProperties chatBackendProperties,
187 StringDeserializer stringDeserializer,
188 JsonDeserializer<AbstractMessageTo> messageDeserializer)
190 Map<String, Object> properties = new HashMap<>();
191 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
193 ConsumerConfig.CLIENT_ID_CONFIG,
194 chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
196 ConsumerConfig.GROUP_ID_CONFIG,
198 return new KafkaConsumer<>(
201 messageDeserializer);
205 Consumer<String, AbstractMessageTo> dataChannelConsumer(
206 Properties defaultConsumerProperties,
207 ChatBackendProperties chatBackendProperties,
208 StringDeserializer stringDeserializer,
209 JsonDeserializer<AbstractMessageTo> messageDeserializer)
211 Map<String, Object> properties = new HashMap<>();
212 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
214 ConsumerConfig.CLIENT_ID_CONFIG,
215 chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
217 ConsumerConfig.GROUP_ID_CONFIG,
219 return new KafkaConsumer<>(
222 messageDeserializer);
226 StringDeserializer stringDeserializer()
228 return new StringDeserializer();
232 JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
234 JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
235 deserializer.configure(
237 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
238 JsonDeserializer.TYPE_MAPPINGS, typeMappings),
244 String typeMappings ()
247 "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," +
248 "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
252 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
254 Properties properties = new Properties();
255 properties.setProperty(
256 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
257 chatBackendProperties.getKafka().getBootstrapServers());
262 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
264 Properties properties = new Properties();
265 properties.setProperty(
266 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
267 chatBackendProperties.getKafka().getBootstrapServers());
268 properties.setProperty(
269 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
271 properties.setProperty(
272 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
280 return ZoneId.systemDefault();