NEU
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHome;
5 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.apache.kafka.clients.consumer.ConsumerConfig;
9 import org.apache.kafka.clients.consumer.KafkaConsumer;
10 import org.apache.kafka.clients.producer.KafkaProducer;
11 import org.apache.kafka.clients.producer.Producer;
12 import org.apache.kafka.clients.producer.ProducerConfig;
13 import org.apache.kafka.common.serialization.IntegerDeserializer;
14 import org.apache.kafka.common.serialization.IntegerSerializer;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
22
23 import java.time.Clock;
24 import java.time.ZoneId;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.Properties;
28
29
30 @ConditionalOnProperty(
31     prefix = "chat.backend",
32     name = "services",
33     havingValue = "kafka")
34 @Configuration
35 public class KafkaServicesConfiguration
36 {
37   @Bean
38   ChatHome kafkaChatHome(
39       ShardingStrategy shardingStrategy,
40       ChatMessageChannel chatMessageChannel)
41   {
42     return new KafkaChatHome(shardingStrategy, chatMessageChannel);
43   }
44
45   @Bean
46   KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
47   {
48     return new KafkaChatRoomFactory(chatRoomChannel);
49   }
50
51   @Bean
52   ChatRoomChannel chatRoomChannel(
53       ChatBackendProperties properties,
54       Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
55       Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
56       ShardingStrategy shardingStrategy,
57       ChatMessageChannel chatMessageChannel,
58       Clock clock)
59   {
60     return new ChatRoomChannel(
61         properties.getKafka().getTopic(),
62         chatRoomChannelProducer,
63         chatRoomChannelConsumer,
64         shardingStrategy,
65         chatMessageChannel,
66         clock,
67         properties.getChatroomBufferSize());
68   }
69
70   @Bean
71   Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
72       Properties defaultProducerProperties,
73       IntegerSerializer integerSerializer,
74       JsonSerializer<ChatRoomTo> chatRoomSerializer)
75   {
76     return new KafkaProducer<>(
77         defaultProducerProperties,
78         integerSerializer,
79         chatRoomSerializer);
80   }
81
82   @Bean
83   IntegerSerializer integerSerializer()
84   {
85     return new IntegerSerializer();
86   }
87
88   @Bean
89   JsonSerializer<ChatRoomTo> chatRoomSerializer()
90   {
91     JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
92     return serializer;
93   }
94
95   @Bean
96   Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
97       Properties defaultConsumerProperties,
98       IntegerDeserializer integerDeserializer,
99       JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
100   {
101     Map<String, Object> properties = new HashMap<>();
102     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
103     properties.put(
104         ConsumerConfig.GROUP_ID_CONFIG,
105         "chat_room_channel");
106     return new KafkaConsumer<>(
107         properties,
108         integerDeserializer,
109         chatRoomDeserializer);
110   }
111
112   @Bean
113   IntegerDeserializer integerDeserializer()
114   {
115     return new IntegerDeserializer();
116   }
117
118   @Bean
119   JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
120   {
121     JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
122     return deserializer;
123   }
124
125   @Bean
126   ShardingStrategy shardingStrategy(ChatBackendProperties properties)
127   {
128     return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
129   }
130
131   @Bean
132   ChatMessageChannel chatMessageChannel(
133       ChatBackendProperties properties,
134       Producer<String, MessageTo> chatMessageChannelProducer,
135       Consumer<String, MessageTo> chatMessageChannelConsumer,
136       ZoneId zoneId)
137   {
138     return new ChatMessageChannel(
139         properties.getKafka().getTopic(),
140         chatMessageChannelProducer,
141         chatMessageChannelConsumer,
142         zoneId,
143         properties.getKafka().getNumPartitions());
144   }
145
146   @Bean
147   Producer<String, MessageTo>  chatMessageChannelProducer(
148       Properties defaultProducerProperties,
149       StringSerializer stringSerializer,
150       JsonSerializer<MessageTo> messageSerializer)
151   {
152     return new KafkaProducer<>(
153         defaultProducerProperties,
154         stringSerializer,
155         messageSerializer);
156   }
157
158   @Bean
159   StringSerializer stringSerializer()
160   {
161     return new StringSerializer();
162   }
163
164   @Bean
165   JsonSerializer<MessageTo> chatMessageSerializer()
166   {
167     JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
168     return serializer;
169   }
170
171   @Bean
172   Consumer<String, MessageTo>  chatMessageChannelConsumer(
173       Properties defaultConsumerProperties,
174       StringDeserializer stringDeserializer,
175       JsonDeserializer<MessageTo> messageDeserializer)
176   {
177     Map<String, Object> properties = new HashMap<>();
178     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
179     properties.put(
180         ConsumerConfig.GROUP_ID_CONFIG,
181         "chat_message_channel");
182     return new KafkaConsumer<>(
183         properties,
184         stringDeserializer,
185         messageDeserializer);
186   }
187
188   @Bean
189   StringDeserializer stringDeserializer()
190   {
191     return new StringDeserializer();
192   }
193
194   @Bean
195   JsonDeserializer<MessageTo> chatMessageDeserializer()
196   {
197     JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
198     return deserializer;
199   }
200
201   @Bean
202   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
203   {
204     Properties properties = new Properties();
205     properties.setProperty(
206         ProducerConfig.CLIENT_ID_CONFIG,
207         chatBackendProperties.getKafka().getClientId());
208     properties.setProperty(
209         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
210         chatBackendProperties.getKafka().getBootstrapServers());
211     return properties;
212   }
213
214   @Bean
215   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
216   {
217     Properties properties = new Properties();
218     properties.setProperty(
219         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
220         chatBackendProperties.getKafka().getBootstrapServers());
221     properties.setProperty(
222         ConsumerConfig.CLIENT_ID_CONFIG,
223         chatBackendProperties.getKafka().getClientId());
224     properties.setProperty(
225         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
226         "false");
227     properties.setProperty(
228         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
229         "earliest");
230     return properties;
231   }
232
233   @Bean
234   ZoneId zoneId()
235   {
236     return ZoneId.systemDefault();
237   }
238 }