NG
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHome;
5 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.apache.kafka.clients.consumer.ConsumerConfig;
9 import org.apache.kafka.clients.consumer.KafkaConsumer;
10 import org.apache.kafka.clients.producer.KafkaProducer;
11 import org.apache.kafka.clients.producer.Producer;
12 import org.apache.kafka.clients.producer.ProducerConfig;
13 import org.apache.kafka.common.serialization.IntegerDeserializer;
14 import org.apache.kafka.common.serialization.IntegerSerializer;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
22
23 import java.time.Clock;
24 import java.time.ZoneId;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.Properties;
28
29
30 @ConditionalOnProperty(
31     prefix = "chat.backend",
32     name = "services",
33     havingValue = "kafka")
34 @Configuration
35 public class KafkaServicesConfiguration
36 {
37   @Bean
38   ChatHome kafkaChatHome(
39       ShardingStrategy shardingStrategy,
40       ChatMessageChannel chatMessageChannel)
41   {
42     return new KafkaChatHome(shardingStrategy, chatMessageChannel);
43   }
44
45   @Bean
46   KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
47   {
48     return new KafkaChatRoomFactory(chatRoomChannel);
49   }
50
51   @Bean
52   ChatRoomChannel chatRoomChannel(
53       ChatBackendProperties properties,
54       Producer<Integer, CreateChatRoomRequestTo> chatRoomChannelProducer,
55       Consumer<Integer, CreateChatRoomRequestTo> chatRoomChannelConsumer,
56       ShardingStrategy shardingStrategy,
57       ChatMessageChannel chatMessageChannel,
58       Clock clock)
59   {
60     return new ChatRoomChannel(
61         properties.getKafka().getChatroomChannelTopic(),
62         chatRoomChannelProducer,
63         chatRoomChannelConsumer,
64         shardingStrategy,
65         chatMessageChannel,
66         clock,
67         properties.getChatroomBufferSize());
68   }
69
70   @Bean
71   Producer<Integer, CreateChatRoomRequestTo>  chatRoomChannelProducer(
72       Properties defaultProducerProperties,
73       ChatBackendProperties chatBackendProperties,
74       IntegerSerializer integerSerializer,
75       JsonSerializer<CreateChatRoomRequestTo> chatRoomSerializer)
76   {
77     Map<String, Object> properties = new HashMap<>();
78     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
79     properties.put(
80         ProducerConfig.CLIENT_ID_CONFIG,
81         chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
82     return new KafkaProducer<>(
83         properties,
84         integerSerializer,
85         chatRoomSerializer);
86   }
87
88   @Bean
89   IntegerSerializer integerSerializer()
90   {
91     return new IntegerSerializer();
92   }
93
94   @Bean
95   JsonSerializer<CreateChatRoomRequestTo> chatRoomSerializer()
96   {
97     JsonSerializer<CreateChatRoomRequestTo> serializer = new JsonSerializer<>();
98     serializer.configure(
99         Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
100         false);
101     return serializer;
102   }
103
104   @Bean
105   Consumer<Integer, CreateChatRoomRequestTo>  chatRoomChannelConsumer(
106       Properties defaultConsumerProperties,
107       ChatBackendProperties chatBackendProperties,
108       IntegerDeserializer integerDeserializer,
109       JsonDeserializer<CreateChatRoomRequestTo> chatRoomDeserializer)
110   {
111     Map<String, Object> properties = new HashMap<>();
112     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
113     properties.put(
114         ConsumerConfig.CLIENT_ID_CONFIG,
115         chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
116     properties.put(
117         ConsumerConfig.GROUP_ID_CONFIG,
118         "chat_room_channel");
119     return new KafkaConsumer<>(
120         properties,
121         integerDeserializer,
122         chatRoomDeserializer);
123   }
124
125   @Bean
126   IntegerDeserializer integerDeserializer()
127   {
128     return new IntegerDeserializer();
129   }
130
131   @Bean
132   JsonDeserializer<CreateChatRoomRequestTo> chatRoomDeserializer()
133   {
134     JsonDeserializer<CreateChatRoomRequestTo> deserializer = new JsonDeserializer<>();
135     deserializer.configure(
136         Map.of(
137             JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
138             JsonDeserializer.VALUE_DEFAULT_TYPE, CreateChatRoomRequestTo.class,
139             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
140         false );
141     return deserializer;
142   }
143
144   @Bean
145   ShardingStrategy shardingStrategy(ChatBackendProperties properties)
146   {
147     return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
148   }
149
150   @Bean
151   ChatMessageChannel chatMessageChannel(
152       ChatBackendProperties properties,
153       Producer<String, AbstractTo> chatMessageChannelProducer,
154       Consumer<String, AbstractTo> chatMessageChannelConsumer,
155       ZoneId zoneId)
156   {
157     return new ChatMessageChannel(
158         properties.getKafka().getMessageChannelTopic(),
159         chatMessageChannelProducer,
160         chatMessageChannelConsumer,
161         zoneId,
162         properties.getKafka().getNumPartitions());
163   }
164
165   @Bean
166   Producer<String, AbstractTo>  chatMessageChannelProducer(
167       Properties defaultProducerProperties,
168       ChatBackendProperties chatBackendProperties,
169       StringSerializer stringSerializer,
170       JsonSerializer<AbstractTo> messageSerializer)
171   {
172     Map<String, Object> properties = new HashMap<>();
173     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
174     properties.put(
175         ProducerConfig.CLIENT_ID_CONFIG,
176         chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER");
177     return new KafkaProducer<>(
178         properties,
179         stringSerializer,
180         messageSerializer);
181   }
182
183   @Bean
184   StringSerializer stringSerializer()
185   {
186     return new StringSerializer();
187   }
188
189   @Bean
190   JsonSerializer<AbstractTo> chatMessageSerializer()
191   {
192     JsonSerializer<AbstractTo> serializer = new JsonSerializer<>();
193     serializer.configure(
194         Map.of(JsonSerializer.TYPE_MAPPINGS,
195             "create:" + CreateChatRoomRequestTo.class.getCanonicalName() + "," +
196             "message:" + ChatMessageTo.class.getCanonicalName()),
197         false);
198     return serializer;
199   }
200
201   @Bean
202   Consumer<String, ChatMessageTo>  chatMessageChannelConsumer(
203       Properties defaultConsumerProperties,
204       ChatBackendProperties chatBackendProperties,
205       StringDeserializer stringDeserializer,
206       JsonDeserializer<ChatMessageTo> messageDeserializer)
207   {
208     Map<String, Object> properties = new HashMap<>();
209     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
210     properties.put(
211         ConsumerConfig.CLIENT_ID_CONFIG,
212         chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
213     properties.put(
214         ConsumerConfig.GROUP_ID_CONFIG,
215         "chat_message_channel");
216     return new KafkaConsumer<>(
217         properties,
218         stringDeserializer,
219         messageDeserializer);
220   }
221
222   @Bean
223   StringDeserializer stringDeserializer()
224   {
225     return new StringDeserializer();
226   }
227
228   @Bean
229   JsonDeserializer<ChatMessageTo> chatMessageDeserializer()
230   {
231     JsonDeserializer<ChatMessageTo> deserializer = new JsonDeserializer<>();
232     deserializer.configure(
233         Map.of(
234             JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
235             JsonDeserializer.VALUE_DEFAULT_TYPE, ChatMessageTo.class,
236             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
237         false );
238     return deserializer;
239   }
240
241   @Bean
242   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
243   {
244     Properties properties = new Properties();
245     properties.setProperty(
246         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
247         chatBackendProperties.getKafka().getBootstrapServers());
248     return properties;
249   }
250
251   @Bean
252   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
253   {
254     Properties properties = new Properties();
255     properties.setProperty(
256         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
257         chatBackendProperties.getKafka().getBootstrapServers());
258     properties.setProperty(
259         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
260         "false");
261     properties.setProperty(
262         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
263         "earliest");
264     return properties;
265   }
266
267   @Bean
268   ZoneId zoneId()
269   {
270     return ZoneId.systemDefault();
271   }
272 }