1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.serialization.StringDeserializer;
15 import org.apache.kafka.common.serialization.StringSerializer;
16 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
17 import org.springframework.context.annotation.Bean;
18 import org.springframework.context.annotation.Configuration;
19 import org.springframework.kafka.support.serializer.JsonDeserializer;
20 import org.springframework.kafka.support.serializer.JsonSerializer;
21 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
23 import java.time.Clock;
24 import java.time.ZoneId;
25 import java.util.HashMap;
26 import java.util.List;
28 import java.util.Properties;
31 @ConditionalOnProperty(
32 prefix = "chat.backend",
34 havingValue = "kafka")
36 public class KafkaServicesConfiguration
39 KafkaServicesApplicationRunner kafkaServicesApplicationRunner(
40 ThreadPoolTaskExecutor taskExecutor,
41 ChatRoomChannel chatRoomChannel,
42 Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
43 KafkaServicesApplicationRunner.WorkAssignor workAssignor)
45 return new KafkaServicesApplicationRunner(
48 chatRoomChannelConsumer,
53 KafkaServicesApplicationRunner.WorkAssignor workAssignor(
54 ChatBackendProperties properties,
55 ChatRoomChannel chatRoomChannel)
60 List.of(properties.getKafka().getChatRoomChannelTopic());
61 consumer.subscribe(topics, chatRoomChannel);
66 ChatHomeService kafkaChatHome(
67 ChatBackendProperties properties,
68 ChatRoomChannel chatRoomChannel)
70 return new KafkaChatHomeService(
71 properties.getKafka().getNumPartitions(),
76 ChatRoomChannel chatRoomChannel(
77 ChatBackendProperties properties,
78 Producer<String, AbstractMessageTo> chatRoomChannelProducer,
79 Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
83 return new ChatRoomChannel(
84 properties.getKafka().getChatRoomChannelTopic(),
85 chatRoomChannelProducer,
86 chatRoomChannelConsumer,
88 properties.getKafka().getNumPartitions(),
89 properties.getChatroomBufferSize(),
94 Producer<String, AbstractMessageTo> chatRoomChannelProducer(
95 Properties defaultProducerProperties,
96 ChatBackendProperties chatBackendProperties,
97 StringSerializer stringSerializer,
98 JsonSerializer<AbstractMessageTo> messageSerializer)
100 Map<String, Object> properties = new HashMap<>();
101 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
103 ProducerConfig.CLIENT_ID_CONFIG,
104 chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
105 return new KafkaProducer<>(
112 StringSerializer stringSerializer()
114 return new StringSerializer();
118 JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
120 JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
121 serializer.configure(
123 JsonSerializer.TYPE_MAPPINGS, typeMappings),
129 Consumer<String, AbstractMessageTo> chatRoomChannelConsumer(
130 Properties defaultConsumerProperties,
131 ChatBackendProperties chatBackendProperties,
132 StringDeserializer stringDeserializer,
133 JsonDeserializer<AbstractMessageTo> messageDeserializer)
135 Map<String, Object> properties = new HashMap<>();
136 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
138 ConsumerConfig.CLIENT_ID_CONFIG,
139 chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
141 ConsumerConfig.GROUP_ID_CONFIG,
143 return new KafkaConsumer<>(
146 messageDeserializer);
150 StringDeserializer stringDeserializer()
152 return new StringDeserializer();
156 JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
158 JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
159 deserializer.configure(
161 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
162 JsonDeserializer.TYPE_MAPPINGS, typeMappings),
168 String typeMappings ()
171 "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," +
172 "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
176 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
178 Properties properties = new Properties();
179 properties.setProperty(
180 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
181 chatBackendProperties.getKafka().getBootstrapServers());
186 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
188 Properties properties = new Properties();
189 properties.setProperty(
190 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
191 chatBackendProperties.getKafka().getBootstrapServers());
192 properties.setProperty(
193 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
195 properties.setProperty(
196 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
204 return ZoneId.systemDefault();