1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHome;
5 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.apache.kafka.clients.consumer.ConsumerConfig;
9 import org.apache.kafka.clients.consumer.KafkaConsumer;
10 import org.apache.kafka.clients.producer.KafkaProducer;
11 import org.apache.kafka.clients.producer.Producer;
12 import org.apache.kafka.clients.producer.ProducerConfig;
13 import org.apache.kafka.common.serialization.IntegerDeserializer;
14 import org.apache.kafka.common.serialization.IntegerSerializer;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
23 import java.time.Clock;
24 import java.time.ZoneId;
25 import java.util.HashMap;
27 import java.util.Properties;
30 @ConditionalOnProperty(
31 prefix = "chat.backend",
33 havingValue = "kafka")
35 public class KafkaServicesConfiguration
38 ChatHome kafkaChatHome(
39 ShardingStrategy shardingStrategy,
40 ChatMessageChannel chatMessageChannel)
42 return new KafkaChatHome(shardingStrategy, chatMessageChannel);
46 KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
48 return new KafkaChatRoomFactory(chatRoomChannel);
52 ChatRoomChannel chatRoomChannel(
53 ChatBackendProperties properties,
54 Producer<Integer, CreateChatRoomRequestTo> chatRoomChannelProducer,
55 Consumer<Integer, CreateChatRoomRequestTo> chatRoomChannelConsumer,
56 ShardingStrategy shardingStrategy,
57 ChatMessageChannel chatMessageChannel,
60 return new ChatRoomChannel(
61 properties.getKafka().getChatroomChannelTopic(),
62 chatRoomChannelProducer,
63 chatRoomChannelConsumer,
67 properties.getChatroomBufferSize());
71 Producer<Integer, CreateChatRoomRequestTo> chatRoomChannelProducer(
72 Properties defaultProducerProperties,
73 ChatBackendProperties chatBackendProperties,
74 IntegerSerializer integerSerializer,
75 JsonSerializer<CreateChatRoomRequestTo> chatRoomSerializer)
77 Map<String, Object> properties = new HashMap<>();
78 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
80 ProducerConfig.CLIENT_ID_CONFIG,
81 chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
82 return new KafkaProducer<>(
89 IntegerSerializer integerSerializer()
91 return new IntegerSerializer();
95 JsonSerializer<CreateChatRoomRequestTo> chatRoomSerializer()
97 JsonSerializer<CreateChatRoomRequestTo> serializer = new JsonSerializer<>();
99 Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
105 Consumer<Integer, CreateChatRoomRequestTo> chatRoomChannelConsumer(
106 Properties defaultConsumerProperties,
107 ChatBackendProperties chatBackendProperties,
108 IntegerDeserializer integerDeserializer,
109 JsonDeserializer<CreateChatRoomRequestTo> chatRoomDeserializer)
111 Map<String, Object> properties = new HashMap<>();
112 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
114 ConsumerConfig.CLIENT_ID_CONFIG,
115 chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
117 ConsumerConfig.GROUP_ID_CONFIG,
118 "chat_room_channel");
119 return new KafkaConsumer<>(
122 chatRoomDeserializer);
126 IntegerDeserializer integerDeserializer()
128 return new IntegerDeserializer();
132 JsonDeserializer<CreateChatRoomRequestTo> chatRoomDeserializer()
134 JsonDeserializer<CreateChatRoomRequestTo> deserializer = new JsonDeserializer<>();
135 deserializer.configure(
137 JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
138 JsonDeserializer.VALUE_DEFAULT_TYPE, CreateChatRoomRequestTo.class,
139 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
145 ShardingStrategy shardingStrategy(ChatBackendProperties properties)
147 return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
151 ChatMessageChannel chatMessageChannel(
152 ChatBackendProperties properties,
153 Producer<String, AbstractTo> chatMessageChannelProducer,
154 Consumer<String, AbstractTo> chatMessageChannelConsumer,
157 return new ChatMessageChannel(
158 properties.getKafka().getMessageChannelTopic(),
159 chatMessageChannelProducer,
160 chatMessageChannelConsumer,
162 properties.getKafka().getNumPartitions());
166 Producer<String, AbstractTo> chatMessageChannelProducer(
167 Properties defaultProducerProperties,
168 ChatBackendProperties chatBackendProperties,
169 StringSerializer stringSerializer,
170 JsonSerializer<AbstractTo> messageSerializer)
172 Map<String, Object> properties = new HashMap<>();
173 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
175 ProducerConfig.CLIENT_ID_CONFIG,
176 chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER");
177 return new KafkaProducer<>(
184 StringSerializer stringSerializer()
186 return new StringSerializer();
190 JsonSerializer<AbstractTo> chatMessageSerializer()
192 JsonSerializer<AbstractTo> serializer = new JsonSerializer<>();
193 serializer.configure(
194 Map.of(JsonSerializer.TYPE_MAPPINGS,
195 "create:" + CreateChatRoomRequestTo.class.getCanonicalName() + "," +
196 "message:" + ChatMessageTo.class.getCanonicalName()),
202 Consumer<String, ChatMessageTo> chatMessageChannelConsumer(
203 Properties defaultConsumerProperties,
204 ChatBackendProperties chatBackendProperties,
205 StringDeserializer stringDeserializer,
206 JsonDeserializer<ChatMessageTo> messageDeserializer)
208 Map<String, Object> properties = new HashMap<>();
209 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
211 ConsumerConfig.CLIENT_ID_CONFIG,
212 chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
214 ConsumerConfig.GROUP_ID_CONFIG,
215 "chat_message_channel");
216 return new KafkaConsumer<>(
219 messageDeserializer);
223 StringDeserializer stringDeserializer()
225 return new StringDeserializer();
229 JsonDeserializer<ChatMessageTo> chatMessageDeserializer()
231 JsonDeserializer<ChatMessageTo> deserializer = new JsonDeserializer<>();
232 deserializer.configure(
234 JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
235 JsonDeserializer.VALUE_DEFAULT_TYPE, ChatMessageTo.class,
236 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
242 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
244 Properties properties = new Properties();
245 properties.setProperty(
246 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
247 chatBackendProperties.getKafka().getBootstrapServers());
252 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
254 Properties properties = new Properties();
255 properties.setProperty(
256 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
257 chatBackendProperties.getKafka().getBootstrapServers());
258 properties.setProperty(
259 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
261 properties.setProperty(
262 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
270 return ZoneId.systemDefault();