1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHome;
5 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.apache.kafka.clients.consumer.ConsumerConfig;
9 import org.apache.kafka.clients.consumer.KafkaConsumer;
10 import org.apache.kafka.clients.producer.KafkaProducer;
11 import org.apache.kafka.clients.producer.Producer;
12 import org.apache.kafka.clients.producer.ProducerConfig;
13 import org.apache.kafka.common.serialization.IntegerDeserializer;
14 import org.apache.kafka.common.serialization.IntegerSerializer;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
23 import java.time.Clock;
24 import java.time.ZoneId;
25 import java.util.HashMap;
27 import java.util.Properties;
30 @ConditionalOnProperty(
31 prefix = "chat.backend",
33 havingValue = "kafka")
35 public class KafkaServicesConfiguration
38 ChatHome kafkaChatHome(
39 ShardingStrategy shardingStrategy,
40 ChatMessageChannel chatMessageChannel)
42 return new KafkaChatHome(shardingStrategy, chatMessageChannel);
46 KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
48 return new KafkaChatRoomFactory(chatRoomChannel);
52 ChatRoomChannel chatRoomChannel(
53 ChatBackendProperties properties,
54 Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
55 Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
56 ShardingStrategy shardingStrategy,
57 ChatMessageChannel chatMessageChannel,
60 return new ChatRoomChannel(
61 properties.getKafka().getTopic(),
62 chatRoomChannelProducer,
63 chatRoomChannelConsumer,
67 properties.getChatroomBufferSize());
71 Producer<Integer, ChatRoomTo> chatRoomChannelProducer(
72 Properties defaultProducerProperties,
73 IntegerSerializer integerSerializer,
74 JsonSerializer<ChatRoomTo> chatRoomSerializer)
76 return new KafkaProducer<>(
77 defaultProducerProperties,
83 IntegerSerializer integerSerializer()
85 return new IntegerSerializer();
89 JsonSerializer<ChatRoomTo> chatRoomSerializer()
91 JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
96 Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer(
97 Properties defaultConsumerProperties,
98 IntegerDeserializer integerDeserializer,
99 JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
101 Map<String, Object> properties = new HashMap<>();
102 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
104 ConsumerConfig.GROUP_ID_CONFIG,
105 "chat_room_channel");
106 return new KafkaConsumer<>(
109 chatRoomDeserializer);
113 IntegerDeserializer integerDeserializer()
115 return new IntegerDeserializer();
119 JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
121 JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
126 ShardingStrategy shardingStrategy(ChatBackendProperties properties)
128 return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
132 ChatMessageChannel chatMessageChannel(
133 ChatBackendProperties properties,
134 Producer<String, MessageTo> chatMessageChannelProducer,
135 Consumer<String, MessageTo> chatMessageChannelConsumer,
138 return new ChatMessageChannel(
139 properties.getKafka().getTopic(),
140 chatMessageChannelProducer,
141 chatMessageChannelConsumer,
143 properties.getKafka().getNumPartitions());
147 Producer<String, MessageTo> chatMessageChannelProducer(
148 Properties defaultProducerProperties,
149 StringSerializer stringSerializer,
150 JsonSerializer<MessageTo> messageSerializer)
152 return new KafkaProducer<>(
153 defaultProducerProperties,
159 StringSerializer stringSerializer()
161 return new StringSerializer();
165 JsonSerializer<MessageTo> chatMessageSerializer()
167 JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
172 Consumer<String, MessageTo> chatMessageChannelConsumer(
173 Properties defaultConsumerProperties,
174 StringDeserializer stringDeserializer,
175 JsonDeserializer<MessageTo> messageDeserializer)
177 Map<String, Object> properties = new HashMap<>();
178 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
180 ConsumerConfig.GROUP_ID_CONFIG,
181 "chat_message_channel");
182 return new KafkaConsumer<>(
185 messageDeserializer);
189 StringDeserializer stringDeserializer()
191 return new StringDeserializer();
195 JsonDeserializer<MessageTo> chatMessageDeserializer()
197 JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
202 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
204 Properties properties = new Properties();
205 properties.setProperty(
206 ProducerConfig.CLIENT_ID_CONFIG,
207 chatBackendProperties.getKafka().getClientId());
208 properties.setProperty(
209 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
210 chatBackendProperties.getKafka().getBootstrapServers());
215 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
217 Properties properties = new Properties();
218 properties.setProperty(
219 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
220 chatBackendProperties.getKafka().getBootstrapServers());
221 properties.setProperty(
222 ConsumerConfig.CLIENT_ID_CONFIG,
223 chatBackendProperties.getKafka().getClientId());
224 properties.setProperty(
225 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
227 properties.setProperty(
228 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
236 return ZoneId.systemDefault();