NEU
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHome;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
7 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.serialization.IntegerDeserializer;
15 import org.apache.kafka.common.serialization.IntegerSerializer;
16 import org.apache.kafka.common.serialization.StringDeserializer;
17 import org.apache.kafka.common.serialization.StringSerializer;
18 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
19 import org.springframework.context.annotation.Bean;
20 import org.springframework.context.annotation.Configuration;
21 import org.springframework.kafka.support.serializer.JsonDeserializer;
22 import org.springframework.kafka.support.serializer.JsonSerializer;
23
24 import java.time.Clock;
25 import java.time.ZoneId;
26 import java.util.HashMap;
27 import java.util.Map;
28 import java.util.Properties;
29
30
31 @ConditionalOnProperty(
32     prefix = "chat.backend",
33     name = "services",
34     havingValue = "kafka")
35 @Configuration
36 public class KafkaServicesConfiguration
37 {
38   @Bean
39   ChatHome kafkaChatHome(
40       ShardingStrategy shardingStrategy,
41       ChatMessageChannel chatMessageChannel)
42   {
43     return new KafkaChatHome(shardingStrategy, chatMessageChannel);
44   }
45
46   @Bean
47   KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
48   {
49     return new KafkaChatRoomFactory(chatRoomChannel);
50   }
51
52   @Bean
53   ChatRoomChannel chatRoomChannel(
54       ChatBackendProperties properties,
55       Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
56       Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
57       ShardingStrategy shardingStrategy,
58       ChatMessageChannel chatMessageChannel,
59       Clock clock)
60   {
61     return new ChatRoomChannel(
62         properties.getKafka().getTopic(),
63         chatRoomChannelProducer,
64         chatRoomChannelConsumer,
65         shardingStrategy,
66         chatMessageChannel,
67         clock,
68         properties.getChatroomBufferSize());
69   }
70
71   @Bean
72   Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
73       Properties defaultProducerProperties,
74       IntegerSerializer integerSerializer,
75       JsonSerializer<ChatRoomTo> chatRoomSerializer)
76   {
77     return new KafkaProducer<>(
78         defaultProducerProperties,
79         integerSerializer,
80         chatRoomSerializer);
81   }
82
83   @Bean
84   IntegerSerializer integerSerializer()
85   {
86     return new IntegerSerializer();
87   }
88
89   @Bean
90   JsonSerializer<ChatRoomTo> chatRoomSerializer()
91   {
92     JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
93     serializer.configure(
94         Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
95         false);
96     return serializer;
97   }
98
99   @Bean
100   Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
101       Properties defaultConsumerProperties,
102       IntegerDeserializer integerDeserializer,
103       JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
104   {
105     Map<String, Object> properties = new HashMap<>();
106     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
107     properties.put(
108         ConsumerConfig.GROUP_ID_CONFIG,
109         "chat_room_channel");
110     return new KafkaConsumer<>(
111         properties,
112         integerDeserializer,
113         chatRoomDeserializer);
114   }
115
116   @Bean
117   IntegerDeserializer integerDeserializer()
118   {
119     return new IntegerDeserializer();
120   }
121
122   @Bean
123   JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
124   {
125     JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
126     deserializer.configure(
127         Map.of(
128             JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
129             JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class,
130             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
131         false );
132     return deserializer;
133   }
134
135   @Bean
136   ShardingStrategy shardingStrategy(ChatBackendProperties properties)
137   {
138     return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
139   }
140
141   @Bean
142   ChatMessageChannel chatMessageChannel(
143       ChatBackendProperties properties,
144       Producer<String, MessageTo> chatMessageChannelProducer,
145       Consumer<String, MessageTo> chatMessageChannelConsumer,
146       ZoneId zoneId)
147   {
148     return new ChatMessageChannel(
149         properties.getKafka().getTopic(),
150         chatMessageChannelProducer,
151         chatMessageChannelConsumer,
152         zoneId,
153         properties.getKafka().getNumPartitions());
154   }
155
156   @Bean
157   Producer<String, MessageTo>  chatMessageChannelProducer(
158       Properties defaultProducerProperties,
159       StringSerializer stringSerializer,
160       JsonSerializer<MessageTo> messageSerializer)
161   {
162     return new KafkaProducer<>(
163         defaultProducerProperties,
164         stringSerializer,
165         messageSerializer);
166   }
167
168   @Bean
169   StringSerializer stringSerializer()
170   {
171     return new StringSerializer();
172   }
173
174   @Bean
175   JsonSerializer<MessageTo> chatMessageSerializer()
176   {
177     JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
178     serializer.configure(
179         Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
180         false);
181     return serializer;
182   }
183
184   @Bean
185   Consumer<String, MessageTo>  chatMessageChannelConsumer(
186       Properties defaultConsumerProperties,
187       StringDeserializer stringDeserializer,
188       JsonDeserializer<MessageTo> messageDeserializer)
189   {
190     Map<String, Object> properties = new HashMap<>();
191     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
192     properties.put(
193         ConsumerConfig.GROUP_ID_CONFIG,
194         "chat_message_channel");
195     return new KafkaConsumer<>(
196         properties,
197         stringDeserializer,
198         messageDeserializer);
199   }
200
201   @Bean
202   StringDeserializer stringDeserializer()
203   {
204     return new StringDeserializer();
205   }
206
207   @Bean
208   JsonDeserializer<MessageTo> chatMessageDeserializer()
209   {
210     JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
211     deserializer.configure(
212         Map.of(
213             JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
214             JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class,
215             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
216         false );
217     return deserializer;
218   }
219
220   @Bean
221   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
222   {
223     Properties properties = new Properties();
224     properties.setProperty(
225         ProducerConfig.CLIENT_ID_CONFIG,
226         chatBackendProperties.getKafka().getClientId());
227     properties.setProperty(
228         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
229         chatBackendProperties.getKafka().getBootstrapServers());
230     return properties;
231   }
232
233   @Bean
234   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
235   {
236     Properties properties = new Properties();
237     properties.setProperty(
238         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
239         chatBackendProperties.getKafka().getBootstrapServers());
240     properties.setProperty(
241         ConsumerConfig.CLIENT_ID_CONFIG,
242         chatBackendProperties.getKafka().getClientId());
243     properties.setProperty(
244         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
245         "false");
246     properties.setProperty(
247         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
248         "earliest");
249     return properties;
250   }
251
252   @Bean
253   ZoneId zoneId()
254   {
255     return ZoneId.systemDefault();
256   }
257 }