b7eb711a68f872a95cb8ab8fffc6045a05ae19df
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHome;
5 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.apache.kafka.clients.consumer.ConsumerConfig;
9 import org.apache.kafka.clients.consumer.KafkaConsumer;
10 import org.apache.kafka.clients.producer.KafkaProducer;
11 import org.apache.kafka.clients.producer.Producer;
12 import org.apache.kafka.clients.producer.ProducerConfig;
13 import org.apache.kafka.common.serialization.IntegerDeserializer;
14 import org.apache.kafka.common.serialization.IntegerSerializer;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Configuration;
20 import org.springframework.kafka.support.serializer.JsonDeserializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
22
23 import java.time.Clock;
24 import java.time.ZoneId;
25 import java.util.Properties;
26
27
28 @ConditionalOnProperty(
29     prefix = "chat.backend",
30     name = "services",
31     havingValue = "kafka")
32 @Configuration
33 public class KafkaServicesConfiguration
34 {
35   @Bean
36   ChatHome kafkaChatHome(
37       ShardingStrategy shardingStrategy,
38       ChatMessageChannel chatMessageChannel)
39   {
40     return new KafkaChatHome(shardingStrategy, chatMessageChannel);
41   }
42
43   @Bean
44   KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
45   {
46     return new KafkaChatRoomFactory(chatRoomChannel);
47   }
48
49   @Bean
50   ChatRoomChannel chatRoomChannel(
51       ChatBackendProperties properties,
52       Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
53       Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
54       ShardingStrategy shardingStrategy,
55       ChatMessageChannel chatMessageChannel,
56       Clock clock)
57   {
58     return new ChatRoomChannel(
59         properties.getKafka().getTopic(),
60         chatRoomChannelProducer,
61         chatRoomChannelConsumer,
62         shardingStrategy,
63         chatMessageChannel,
64         clock,
65         properties.getChatroomBufferSize());
66   }
67
68   @Bean
69   Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
70       Properties defaultProducerProperties,
71       IntegerSerializer integerSerializer,
72       JsonSerializer<ChatRoomTo> chatRoomSerializer)
73   {
74     return new KafkaProducer<>(
75         defaultProducerProperties,
76         integerSerializer,
77         chatRoomSerializer);
78   }
79
80   @Bean
81   IntegerSerializer integerSerializer()
82   {
83     return new IntegerSerializer();
84   }
85
86   @Bean
87   JsonSerializer<ChatRoomTo> chatRoomSerializer()
88   {
89     JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
90     return serializer;
91   }
92
93   @Bean
94   Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
95       Properties defaultConsumerProperties,
96       IntegerDeserializer integerDeserializer,
97       JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
98   {
99     Properties properties = new Properties(defaultConsumerProperties);
100     properties.setProperty(
101         ConsumerConfig.GROUP_ID_CONFIG,
102         "chat_room_channel");
103     return new KafkaConsumer<>(
104         properties,
105         integerDeserializer,
106         chatRoomDeserializer);
107   }
108
109   @Bean
110   IntegerDeserializer integerDeserializer()
111   {
112     return new IntegerDeserializer();
113   }
114
115   @Bean
116   JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
117   {
118     JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
119     return deserializer;
120   }
121
122   @Bean
123   ShardingStrategy shardingStrategy(ChatBackendProperties properties)
124   {
125     return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
126   }
127
128   @Bean
129   ChatMessageChannel chatMessageChannel(
130       ChatBackendProperties properties,
131       Producer<String, MessageTo> chatMessageChannelProducer,
132       Consumer<String, MessageTo> chatMessageChannelConsumer,
133       ZoneId zoneId)
134   {
135     return new ChatMessageChannel(
136         properties.getKafka().getTopic(),
137         chatMessageChannelProducer,
138         chatMessageChannelConsumer,
139         zoneId,
140         properties.getKafka().getNumPartitions());
141   }
142
143   @Bean
144   Producer<String, MessageTo>  chatMessageChannelProducer(
145       Properties defaultProducerProperties,
146       StringSerializer stringSerializer,
147       JsonSerializer<MessageTo> messageSerializer)
148   {
149     return new KafkaProducer<>(
150         defaultProducerProperties,
151         stringSerializer,
152         messageSerializer);
153   }
154
155   @Bean
156   StringSerializer stringSerializer()
157   {
158     return new StringSerializer();
159   }
160
161   @Bean
162   JsonSerializer<MessageTo> chatMessageSerializer()
163   {
164     JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
165     return serializer;
166   }
167
168   @Bean
169   Consumer<String, MessageTo>  chatMessageChannelConsumer(
170       Properties defaultConsumerProperties,
171       StringDeserializer stringDeserializer,
172       JsonDeserializer<MessageTo> messageDeserializer)
173   {
174     Properties properties = new Properties(defaultConsumerProperties);
175     properties.setProperty(
176         ConsumerConfig.GROUP_ID_CONFIG,
177         "chat_message_channel");
178     return new KafkaConsumer<>(
179         properties,
180         stringDeserializer,
181         messageDeserializer);
182   }
183
184   @Bean
185   StringDeserializer stringDeserializer()
186   {
187     return new StringDeserializer();
188   }
189
190   @Bean
191   JsonDeserializer<MessageTo> chatMessageDeserializer()
192   {
193     JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
194     return deserializer;
195   }
196
197   @Bean
198   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
199   {
200     Properties properties = new Properties();
201     properties.setProperty(
202         ProducerConfig.CLIENT_ID_CONFIG,
203         chatBackendProperties.getKafka().getClientId());
204     properties.setProperty(
205         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
206         chatBackendProperties.getKafka().getBootstrapServers());
207     return properties;
208   }
209
210   @Bean
211   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
212   {
213     Properties properties = new Properties();
214     properties.setProperty(
215         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
216         chatBackendProperties.getKafka().getBootstrapServers());
217     properties.setProperty(
218         ConsumerConfig.CLIENT_ID_CONFIG,
219         chatBackendProperties.getKafka().getClientId());
220     properties.setProperty(
221         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
222         "false");
223     properties.setProperty(
224         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
225         "earliest");
226     return properties;
227   }
228
229   @Bean
230   ZoneId zoneId()
231   {
232     return ZoneId.systemDefault();
233   }
234 }