NEU
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHome;
5 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.apache.kafka.clients.consumer.ConsumerConfig;
9 import org.apache.kafka.clients.consumer.KafkaConsumer;
10 import org.apache.kafka.clients.producer.KafkaProducer;
11 import org.apache.kafka.clients.producer.Producer;
12 import org.apache.kafka.clients.producer.ProducerConfig;
13 import org.apache.kafka.common.serialization.IntegerDeserializer;
14 import org.apache.kafka.common.serialization.IntegerSerializer;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
22
23 import java.time.Clock;
24 import java.time.ZoneId;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.Properties;
28
29
30 @ConditionalOnProperty(
31     prefix = "chat.backend",
32     name = "services",
33     havingValue = "kafka")
34 @Configuration
35 public class KafkaServicesConfiguration
36 {
37   @Bean
38   ChatHome kafkaChatHome(
39       ShardingStrategy shardingStrategy,
40       ChatMessageChannel chatMessageChannel)
41   {
42     return new KafkaChatHome(shardingStrategy, chatMessageChannel);
43   }
44
45   @Bean
46   KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
47   {
48     return new KafkaChatRoomFactory(chatRoomChannel);
49   }
50
51   @Bean
52   ChatRoomChannel chatRoomChannel(
53       ChatBackendProperties properties,
54       Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
55       Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
56       ShardingStrategy shardingStrategy,
57       ChatMessageChannel chatMessageChannel,
58       Clock clock)
59   {
60     return new ChatRoomChannel(
61         properties.getKafka().getTopic(),
62         chatRoomChannelProducer,
63         chatRoomChannelConsumer,
64         shardingStrategy,
65         chatMessageChannel,
66         clock,
67         properties.getChatroomBufferSize());
68   }
69
70   @Bean
71   Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
72       Properties defaultProducerProperties,
73       ChatBackendProperties chatBackendProperties,
74       IntegerSerializer integerSerializer,
75       JsonSerializer<ChatRoomTo> chatRoomSerializer)
76   {
77     Map<String, Object> properties = new HashMap<>();
78     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
79     properties.put(
80         ProducerConfig.CLIENT_ID_CONFIG,
81         chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
82     return new KafkaProducer<>(
83         properties,
84         integerSerializer,
85         chatRoomSerializer);
86   }
87
88   @Bean
89   IntegerSerializer integerSerializer()
90   {
91     return new IntegerSerializer();
92   }
93
94   @Bean
95   JsonSerializer<ChatRoomTo> chatRoomSerializer()
96   {
97     JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
98     serializer.configure(
99         Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
100         false);
101     return serializer;
102   }
103
104   @Bean
105   Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
106       Properties defaultConsumerProperties,
107       ChatBackendProperties chatBackendProperties,
108       IntegerDeserializer integerDeserializer,
109       JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
110   {
111     Map<String, Object> properties = new HashMap<>();
112     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
113     properties.put(
114         ConsumerConfig.CLIENT_ID_CONFIG,
115         chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
116     properties.put(
117         ConsumerConfig.GROUP_ID_CONFIG,
118         "chat_room_channel");
119     return new KafkaConsumer<>(
120         properties,
121         integerDeserializer,
122         chatRoomDeserializer);
123   }
124
125   @Bean
126   IntegerDeserializer integerDeserializer()
127   {
128     return new IntegerDeserializer();
129   }
130
131   @Bean
132   JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
133   {
134     JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
135     deserializer.configure(
136         Map.of(
137             JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
138             JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class,
139             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
140         false );
141     return deserializer;
142   }
143
144   @Bean
145   ShardingStrategy shardingStrategy(ChatBackendProperties properties)
146   {
147     return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
148   }
149
150   @Bean
151   ChatMessageChannel chatMessageChannel(
152       ChatBackendProperties properties,
153       Producer<String, MessageTo> chatMessageChannelProducer,
154       Consumer<String, MessageTo> chatMessageChannelConsumer,
155       ZoneId zoneId)
156   {
157     return new ChatMessageChannel(
158         properties.getKafka().getTopic(),
159         chatMessageChannelProducer,
160         chatMessageChannelConsumer,
161         zoneId,
162         properties.getKafka().getNumPartitions());
163   }
164
165   @Bean
166   Producer<String, MessageTo>  chatMessageChannelProducer(
167       Properties defaultProducerProperties,
168       ChatBackendProperties chatBackendProperties,
169       StringSerializer stringSerializer,
170       JsonSerializer<MessageTo> messageSerializer)
171   {
172     Map<String, Object> properties = new HashMap<>();
173     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
174     properties.put(
175         ProducerConfig.CLIENT_ID_CONFIG,
176         chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER");
177     return new KafkaProducer<>(
178         properties,
179         stringSerializer,
180         messageSerializer);
181   }
182
183   @Bean
184   StringSerializer stringSerializer()
185   {
186     return new StringSerializer();
187   }
188
189   @Bean
190   JsonSerializer<MessageTo> chatMessageSerializer()
191   {
192     JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
193     serializer.configure(
194         Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
195         false);
196     return serializer;
197   }
198
199   @Bean
200   Consumer<String, MessageTo>  chatMessageChannelConsumer(
201       Properties defaultConsumerProperties,
202       ChatBackendProperties chatBackendProperties,
203       StringDeserializer stringDeserializer,
204       JsonDeserializer<MessageTo> messageDeserializer)
205   {
206     Map<String, Object> properties = new HashMap<>();
207     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
208     properties.put(
209         ConsumerConfig.CLIENT_ID_CONFIG,
210         chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
211     properties.put(
212         ConsumerConfig.GROUP_ID_CONFIG,
213         "chat_message_channel");
214     return new KafkaConsumer<>(
215         properties,
216         stringDeserializer,
217         messageDeserializer);
218   }
219
220   @Bean
221   StringDeserializer stringDeserializer()
222   {
223     return new StringDeserializer();
224   }
225
226   @Bean
227   JsonDeserializer<MessageTo> chatMessageDeserializer()
228   {
229     JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
230     deserializer.configure(
231         Map.of(
232             JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
233             JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class,
234             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
235         false );
236     return deserializer;
237   }
238
239   @Bean
240   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
241   {
242     Properties properties = new Properties();
243     properties.setProperty(
244         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
245         chatBackendProperties.getKafka().getBootstrapServers());
246     return properties;
247   }
248
249   @Bean
250   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
251   {
252     Properties properties = new Properties();
253     properties.setProperty(
254         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
255         chatBackendProperties.getKafka().getBootstrapServers());
256     properties.setProperty(
257         ConsumerConfig.CLIENT_ID_CONFIG,
258         chatBackendProperties.getKafka().getClientIdPrefix());
259     properties.setProperty(
260         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
261         "false");
262     properties.setProperty(
263         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
264         "earliest");
265     return properties;
266   }
267
268   @Bean
269   ZoneId zoneId()
270   {
271     return ZoneId.systemDefault();
272   }
273 }