1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHome;
5 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.apache.kafka.clients.consumer.ConsumerConfig;
9 import org.apache.kafka.clients.consumer.KafkaConsumer;
10 import org.apache.kafka.clients.producer.KafkaProducer;
11 import org.apache.kafka.clients.producer.Producer;
12 import org.apache.kafka.clients.producer.ProducerConfig;
13 import org.apache.kafka.common.serialization.IntegerDeserializer;
14 import org.apache.kafka.common.serialization.IntegerSerializer;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
23 import java.time.Clock;
24 import java.time.ZoneId;
25 import java.util.HashMap;
27 import java.util.Properties;
30 @ConditionalOnProperty(
31 prefix = "chat.backend",
33 havingValue = "kafka")
35 public class KafkaServicesConfiguration
38 ChatHome kafkaChatHome(
39 ShardingStrategy shardingStrategy,
40 ChatMessageChannel chatMessageChannel)
42 return new KafkaChatHome(shardingStrategy, chatMessageChannel);
46 KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
48 return new KafkaChatRoomFactory(chatRoomChannel);
52 ChatRoomChannel chatRoomChannel(
53 ChatBackendProperties properties,
54 Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
55 Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
56 ShardingStrategy shardingStrategy,
57 ChatMessageChannel chatMessageChannel,
60 return new ChatRoomChannel(
61 properties.getKafka().getTopic(),
62 chatRoomChannelProducer,
63 chatRoomChannelConsumer,
67 properties.getChatroomBufferSize());
71 Producer<Integer, ChatRoomTo> chatRoomChannelProducer(
72 Properties defaultProducerProperties,
73 ChatBackendProperties chatBackendProperties,
74 IntegerSerializer integerSerializer,
75 JsonSerializer<ChatRoomTo> chatRoomSerializer)
77 Map<String, Object> properties = new HashMap<>();
78 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
80 ProducerConfig.CLIENT_ID_CONFIG,
81 chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
82 return new KafkaProducer<>(
89 IntegerSerializer integerSerializer()
91 return new IntegerSerializer();
95 JsonSerializer<ChatRoomTo> chatRoomSerializer()
97 JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
99 Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
105 Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer(
106 Properties defaultConsumerProperties,
107 ChatBackendProperties chatBackendProperties,
108 IntegerDeserializer integerDeserializer,
109 JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
111 Map<String, Object> properties = new HashMap<>();
112 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
114 ConsumerConfig.CLIENT_ID_CONFIG,
115 chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
117 ConsumerConfig.GROUP_ID_CONFIG,
118 "chat_room_channel");
119 return new KafkaConsumer<>(
122 chatRoomDeserializer);
126 IntegerDeserializer integerDeserializer()
128 return new IntegerDeserializer();
132 JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
134 JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
135 deserializer.configure(
137 JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
138 JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class,
139 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
145 ShardingStrategy shardingStrategy(ChatBackendProperties properties)
147 return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
151 ChatMessageChannel chatMessageChannel(
152 ChatBackendProperties properties,
153 Producer<String, MessageTo> chatMessageChannelProducer,
154 Consumer<String, MessageTo> chatMessageChannelConsumer,
157 return new ChatMessageChannel(
158 properties.getKafka().getTopic(),
159 chatMessageChannelProducer,
160 chatMessageChannelConsumer,
162 properties.getKafka().getNumPartitions());
166 Producer<String, MessageTo> chatMessageChannelProducer(
167 Properties defaultProducerProperties,
168 ChatBackendProperties chatBackendProperties,
169 StringSerializer stringSerializer,
170 JsonSerializer<MessageTo> messageSerializer)
172 Map<String, Object> properties = new HashMap<>();
173 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
175 ProducerConfig.CLIENT_ID_CONFIG,
176 chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER");
177 return new KafkaProducer<>(
184 StringSerializer stringSerializer()
186 return new StringSerializer();
190 JsonSerializer<MessageTo> chatMessageSerializer()
192 JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
193 serializer.configure(
194 Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
200 Consumer<String, MessageTo> chatMessageChannelConsumer(
201 Properties defaultConsumerProperties,
202 ChatBackendProperties chatBackendProperties,
203 StringDeserializer stringDeserializer,
204 JsonDeserializer<MessageTo> messageDeserializer)
206 Map<String, Object> properties = new HashMap<>();
207 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
209 ConsumerConfig.CLIENT_ID_CONFIG,
210 chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
212 ConsumerConfig.GROUP_ID_CONFIG,
213 "chat_message_channel");
214 return new KafkaConsumer<>(
217 messageDeserializer);
221 StringDeserializer stringDeserializer()
223 return new StringDeserializer();
227 JsonDeserializer<MessageTo> chatMessageDeserializer()
229 JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
230 deserializer.configure(
232 JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
233 JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class,
234 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
240 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
242 Properties properties = new Properties();
243 properties.setProperty(
244 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
245 chatBackendProperties.getKafka().getBootstrapServers());
250 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
252 Properties properties = new Properties();
253 properties.setProperty(
254 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
255 chatBackendProperties.getKafka().getBootstrapServers());
256 properties.setProperty(
257 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
259 properties.setProperty(
260 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
268 return ZoneId.systemDefault();