1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHome;
5 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.apache.kafka.clients.consumer.ConsumerConfig;
9 import org.apache.kafka.clients.consumer.KafkaConsumer;
10 import org.apache.kafka.clients.producer.KafkaProducer;
11 import org.apache.kafka.clients.producer.Producer;
12 import org.apache.kafka.clients.producer.ProducerConfig;
13 import org.apache.kafka.common.serialization.IntegerDeserializer;
14 import org.apache.kafka.common.serialization.IntegerSerializer;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
23 import java.time.Clock;
24 import java.time.ZoneId;
25 import java.util.HashMap;
29 @ConditionalOnProperty(
30 prefix = "chat.backend",
32 havingValue = "kafka")
34 public class KafkaServicesConfiguration
37 ChatHome kafkaChatHome(
38 ShardingStrategy shardingStrategy,
39 ChatMessageChannel chatMessageChannel)
41 return new KafkaChatHome(shardingStrategy, chatMessageChannel);
45 KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
47 return new KafkaChatRoomFactory(chatRoomChannel);
51 ChatRoomChannel chatRoomChannel(
52 ChatBackendProperties properties,
53 Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
54 Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
55 ShardingStrategy shardingStrategy,
56 ChatMessageChannel chatMessageChannel,
59 return new ChatRoomChannel(
60 properties.getKafka().getTopic(),
61 chatRoomChannelProducer,
62 chatRoomChannelConsumer,
66 properties.getChatroomBufferSize());
70 Producer<Integer, ChatRoomTo> chatRoomChannelProducer(
71 Map<String, Object> defaultProducerProperties,
72 IntegerSerializer integerSerializer,
73 JsonSerializer<ChatRoomTo> chatRoomSerializer)
75 return new KafkaProducer<>(
76 defaultProducerProperties,
82 IntegerSerializer integerSerializer()
84 return new IntegerSerializer();
88 JsonSerializer<ChatRoomTo> chatRoomSerializer()
90 JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
95 Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer(
96 Map<String, Object> defaultConsumerProperties,
97 IntegerDeserializer integerDeserializer,
98 JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
100 Map<String, Object> properties = new HashMap<>(defaultConsumerProperties);
102 ConsumerConfig.GROUP_ID_CONFIG,
103 "chat_room_channel");
104 return new KafkaConsumer<>(
107 chatRoomDeserializer);
111 IntegerDeserializer integerDeserializer()
113 return new IntegerDeserializer();
117 JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
119 JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
124 ShardingStrategy shardingStrategy(ChatBackendProperties properties)
126 return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
130 ChatMessageChannel chatMessageChannel(
131 ChatBackendProperties properties,
132 Producer<String, MessageTo> chatMessageChannelProducer,
133 Consumer<String, MessageTo> chatMessageChannelConsumer,
136 return new ChatMessageChannel(
137 properties.getKafka().getTopic(),
138 chatMessageChannelProducer,
139 chatMessageChannelConsumer,
141 properties.getKafka().getNumPartitions());
145 Producer<String, MessageTo> chatMessageChannelProducer(
146 Map<String, Object> defaultProducerProperties,
147 StringSerializer stringSerializer,
148 JsonSerializer<MessageTo> messageSerializer)
150 return new KafkaProducer<>(
151 defaultProducerProperties,
157 StringSerializer stringSerializer()
159 return new StringSerializer();
163 JsonSerializer<MessageTo> chatMessageSerializer()
165 JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
170 Consumer<String, MessageTo> chatMessageChannelConsumer(
171 Map<String, Object> defaultConsumerProperties,
172 StringDeserializer stringDeserializer,
173 JsonDeserializer<MessageTo> messageDeserializer)
175 Map<String, Object> properties = new HashMap<>(defaultConsumerProperties);
177 ConsumerConfig.GROUP_ID_CONFIG,
178 "chat_message_channel");
179 return new KafkaConsumer<>(
182 messageDeserializer);
186 StringDeserializer stringDeserializer()
188 return new StringDeserializer();
192 JsonDeserializer<MessageTo> chatMessageDeserializer()
194 JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
199 Map<String, Object> defaultProducerProperties(ChatBackendProperties chatBackendProperties)
202 ProducerConfig.CLIENT_ID_CONFIG,
203 chatBackendProperties.getKafka().getClientId(),
204 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
205 chatBackendProperties.getKafka().getBootstrapServers());
209 Map<String, Object> defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
212 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
213 chatBackendProperties.getKafka().getBootstrapServers(),
214 ConsumerConfig.CLIENT_ID_CONFIG,
215 chatBackendProperties.getKafka().getClientId(),
216 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
218 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
225 return ZoneId.systemDefault();