refactor: Introduced `ConsumerTaskExecutor` -- Aligned code
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.serialization.StringDeserializer;
15 import org.apache.kafka.common.serialization.StringSerializer;
16 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
17 import org.springframework.context.annotation.Bean;
18 import org.springframework.context.annotation.Configuration;
19 import org.springframework.kafka.support.serializer.JsonDeserializer;
20 import org.springframework.kafka.support.serializer.JsonSerializer;
21 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
22
23 import java.time.Clock;
24 import java.time.ZoneId;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29
30
31 @ConditionalOnProperty(
32     prefix = "chat.backend",
33     name = "services",
34     havingValue = "kafka")
35 @Configuration
36 public class KafkaServicesConfiguration
37 {
38   @Bean
39   ConsumerTaskExecutor chatRoomChannelTaskExecutor(
40       ThreadPoolTaskExecutor taskExecutor,
41       ChatRoomChannel chatRoomChannel,
42       Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
43       ConsumerTaskExecutor.WorkAssignor workAssignor)
44   {
45     return new ConsumerTaskExecutor(
46         taskExecutor,
47         chatRoomChannel,
48         chatRoomChannelConsumer,
49         workAssignor);
50   }
51
52   @Bean
53   ConsumerTaskExecutor.WorkAssignor workAssignor(
54       ChatBackendProperties properties,
55       ChatRoomChannel chatRoomChannel)
56   {
57     return consumer ->
58     {
59       List<String> topics =
60           List.of(properties.getKafka().getChatRoomChannelTopic());
61       consumer.subscribe(topics, chatRoomChannel);
62     };
63   }
64
65   @Bean
66     ChatHomeService kafkaChatHome(
67       ChatBackendProperties properties,
68       ChatRoomChannel chatRoomChannel)
69   {
70     return new KafkaChatHomeService(
71         properties.getKafka().getNumPartitions(),
72         chatRoomChannel);
73   }
74
75   @Bean
76   ChatRoomChannel chatRoomChannel(
77       ChatBackendProperties properties,
78       Producer<String, AbstractMessageTo> chatRoomChannelProducer,
79       Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
80       ZoneId zoneId,
81       Clock clock)
82   {
83     return new ChatRoomChannel(
84         properties.getKafka().getChatRoomChannelTopic(),
85         chatRoomChannelProducer,
86         chatRoomChannelConsumer,
87         zoneId,
88         properties.getKafka().getNumPartitions(),
89         properties.getChatroomBufferSize(),
90         clock);
91   }
92
93   @Bean
94   Producer<String, AbstractMessageTo>  chatRoomChannelProducer(
95       Properties defaultProducerProperties,
96       ChatBackendProperties chatBackendProperties,
97       StringSerializer stringSerializer,
98       JsonSerializer<AbstractMessageTo> messageSerializer)
99   {
100     Map<String, Object> properties = new HashMap<>();
101     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
102     properties.put(
103         ProducerConfig.CLIENT_ID_CONFIG,
104         chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
105     return new KafkaProducer<>(
106         properties,
107         stringSerializer,
108         messageSerializer);
109   }
110
111   @Bean
112   StringSerializer stringSerializer()
113   {
114     return new StringSerializer();
115   }
116
117   @Bean
118   JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
119   {
120     JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
121     serializer.configure(
122         Map.of(
123             JsonSerializer.TYPE_MAPPINGS, typeMappings),
124         false);
125     return serializer;
126   }
127
128   @Bean
129   Consumer<String, AbstractMessageTo>  chatRoomChannelConsumer(
130       Properties defaultConsumerProperties,
131       ChatBackendProperties chatBackendProperties,
132       StringDeserializer stringDeserializer,
133       JsonDeserializer<AbstractMessageTo> messageDeserializer)
134   {
135     Map<String, Object> properties = new HashMap<>();
136     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
137     properties.put(
138         ConsumerConfig.CLIENT_ID_CONFIG,
139         chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
140     properties.put(
141         ConsumerConfig.GROUP_ID_CONFIG,
142         "chatroom_channel");
143     return new KafkaConsumer<>(
144         properties,
145         stringDeserializer,
146         messageDeserializer);
147   }
148
149   @Bean
150   StringDeserializer stringDeserializer()
151   {
152     return new StringDeserializer();
153   }
154
155   @Bean
156   JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
157   {
158     JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
159     deserializer.configure(
160         Map.of(
161             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
162             JsonDeserializer.TYPE_MAPPINGS, typeMappings),
163         false );
164     return deserializer;
165   }
166
167   @Bean
168   String typeMappings ()
169   {
170     return
171         "command_create_chatroom:" +  CommandCreateChatRoomTo.class.getCanonicalName() + "," +
172         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
173   }
174
175   @Bean
176   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
177   {
178     Properties properties = new Properties();
179     properties.setProperty(
180         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
181         chatBackendProperties.getKafka().getBootstrapServers());
182     return properties;
183   }
184
185   @Bean
186   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
187   {
188     Properties properties = new Properties();
189     properties.setProperty(
190         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
191         chatBackendProperties.getKafka().getBootstrapServers());
192     properties.setProperty(
193         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
194         "false");
195     properties.setProperty(
196         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
197         "earliest");
198     return properties;
199   }
200
201   @Bean
202   ZoneId zoneId()
203   {
204     return ZoneId.systemDefault();
205   }
206 }