1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHome;
5 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.apache.kafka.clients.consumer.ConsumerConfig;
9 import org.apache.kafka.clients.consumer.KafkaConsumer;
10 import org.apache.kafka.clients.producer.KafkaProducer;
11 import org.apache.kafka.clients.producer.Producer;
12 import org.apache.kafka.clients.producer.ProducerConfig;
13 import org.apache.kafka.common.serialization.IntegerDeserializer;
14 import org.apache.kafka.common.serialization.IntegerSerializer;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
23 import java.time.Clock;
24 import java.time.ZoneId;
25 import java.util.Properties;
28 @ConditionalOnProperty(
29 prefix = "chat.backend",
31 havingValue = "kafka")
33 public class KafkaServicesConfiguration
36 ChatHome kafkaChatHome(
37 ShardingStrategy shardingStrategy,
38 ChatMessageChannel chatMessageChannel)
40 return new KafkaChatHome(shardingStrategy, chatMessageChannel);
44 KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
46 return new KafkaChatRoomFactory(chatRoomChannel);
50 ChatRoomChannel chatRoomChannel(
51 ChatBackendProperties properties,
52 Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
53 Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
54 ShardingStrategy shardingStrategy,
55 ChatMessageChannel chatMessageChannel,
58 return new ChatRoomChannel(
59 properties.getKafka().getTopic(),
60 chatRoomChannelProducer,
61 chatRoomChannelConsumer,
65 properties.getChatroomBufferSize());
69 Producer<Integer, ChatRoomTo> chatRoomChannelProducer(
70 Properties defaultProducerProperties,
71 IntegerSerializer integerSerializer,
72 JsonSerializer<ChatRoomTo> chatRoomSerializer)
74 return new KafkaProducer<>(
75 defaultProducerProperties,
81 IntegerSerializer integerSerializer()
83 return new IntegerSerializer();
87 JsonSerializer<ChatRoomTo> chatRoomSerializer()
89 JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
94 Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer(
95 Properties defaultConsumerProperties,
96 IntegerDeserializer integerDeserializer,
97 JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
99 Properties properties = new Properties(defaultConsumerProperties);
100 properties.setProperty(
101 ConsumerConfig.GROUP_ID_CONFIG,
102 "chat_room_channel");
103 return new KafkaConsumer<>(
106 chatRoomDeserializer);
110 IntegerDeserializer integerDeserializer()
112 return new IntegerDeserializer();
116 JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
118 JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
123 ShardingStrategy shardingStrategy(ChatBackendProperties properties)
125 return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
129 ChatMessageChannel chatMessageChannel(
130 ChatBackendProperties properties,
131 Producer<String, MessageTo> chatMessageChannelProducer,
132 Consumer<String, MessageTo> chatMessageChannelConsumer,
135 return new ChatMessageChannel(
136 properties.getKafka().getTopic(),
137 chatMessageChannelProducer,
138 chatMessageChannelConsumer,
140 properties.getKafka().getNumPartitions());
144 Producer<String, MessageTo> chatMessageChannelProducer(
145 Properties defaultProducerProperties,
146 StringSerializer stringSerializer,
147 JsonSerializer<MessageTo> messageSerializer)
149 return new KafkaProducer<>(
150 defaultProducerProperties,
156 StringSerializer stringSerializer()
158 return new StringSerializer();
162 JsonSerializer<MessageTo> chatMessageSerializer()
164 JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
169 Consumer<String, MessageTo> chatMessageChannelConsumer(
170 Properties defaultConsumerProperties,
171 StringDeserializer stringDeserializer,
172 JsonDeserializer<MessageTo> messageDeserializer)
174 Properties properties = new Properties(defaultConsumerProperties);
175 properties.setProperty(
176 ConsumerConfig.GROUP_ID_CONFIG,
177 "chat_message_channel");
178 return new KafkaConsumer<>(
181 messageDeserializer);
185 StringDeserializer stringDeserializer()
187 return new StringDeserializer();
191 JsonDeserializer<MessageTo> chatMessageDeserializer()
193 JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
198 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
200 Properties properties = new Properties();
201 properties.setProperty(
202 ProducerConfig.CLIENT_ID_CONFIG,
203 chatBackendProperties.getKafka().getClientId());
204 properties.setProperty(
205 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
206 chatBackendProperties.getKafka().getBootstrapServers());
211 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
213 Properties properties = new Properties();
214 properties.setProperty(
215 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
216 chatBackendProperties.getKafka().getBootstrapServers());
217 properties.setProperty(
218 ConsumerConfig.CLIENT_ID_CONFIG,
219 chatBackendProperties.getKafka().getClientId());
220 properties.setProperty(
221 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
223 properties.setProperty(
224 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
232 return ZoneId.systemDefault();