feat: first runnable implementation, that is based on Kafka
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHome;
5 import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
7 import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.serialization.StringDeserializer;
15 import org.apache.kafka.common.serialization.StringSerializer;
16 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
17 import org.springframework.context.annotation.Bean;
18 import org.springframework.context.annotation.Configuration;
19 import org.springframework.kafka.support.serializer.JsonDeserializer;
20 import org.springframework.kafka.support.serializer.JsonSerializer;
21
22 import java.time.Clock;
23 import java.time.ZoneId;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.Properties;
27
28
29 @ConditionalOnProperty(
30     prefix = "chat.backend",
31     name = "services",
32     havingValue = "kafka")
33 @Configuration
34 public class KafkaServicesConfiguration
35 {
36   @Bean
37   ChatHome kafkaChatHome(
38       ChatBackendProperties properties,
39       ChatRoomChannel chatRoomChannel)
40   {
41     return new KafkaChatHome(
42         properties.getKafka().getNumPartitions(),
43         chatRoomChannel);
44   }
45
46   @Bean
47   KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
48   {
49     return new KafkaChatRoomFactory(chatRoomChannel);
50   }
51
52   @Bean
53   ChatRoomChannel chatRoomChannel(
54       ChatBackendProperties properties,
55       Producer<String, AbstractMessageTo> chatRoomChannelProducer,
56       Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
57       ZoneId zoneId,
58       Clock clock)
59   {
60     return new ChatRoomChannel(
61         properties.getKafka().getChatRoomChannelTopic(),
62         chatRoomChannelProducer,
63         chatRoomChannelConsumer,
64         zoneId,
65         properties.getKafka().getNumPartitions(),
66         properties.getChatroomBufferSize(),
67         clock);
68   }
69
70   @Bean
71   Producer<String, AbstractMessageTo>  chatRoomChannelProducer(
72       Properties defaultProducerProperties,
73       ChatBackendProperties chatBackendProperties,
74       StringSerializer stringSerializer,
75       JsonSerializer<AbstractMessageTo> messageSerializer)
76   {
77     Map<String, Object> properties = new HashMap<>();
78     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
79     properties.put(
80         ProducerConfig.CLIENT_ID_CONFIG,
81         chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
82     return new KafkaProducer<>(
83         properties,
84         stringSerializer,
85         messageSerializer);
86   }
87
88   @Bean
89   StringSerializer stringSerializer()
90   {
91     return new StringSerializer();
92   }
93
94   @Bean
95   JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
96   {
97     JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
98     serializer.configure(
99         Map.of(
100             JsonSerializer.TYPE_MAPPINGS, typeMappings),
101         false);
102     return serializer;
103   }
104
105   @Bean
106   Consumer<String, AbstractMessageTo>  chatRoomChannelConsumer(
107       Properties defaultConsumerProperties,
108       ChatBackendProperties chatBackendProperties,
109       StringDeserializer stringDeserializer,
110       JsonDeserializer<AbstractMessageTo> messageDeserializer)
111   {
112     Map<String, Object> properties = new HashMap<>();
113     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
114     properties.put(
115         ConsumerConfig.CLIENT_ID_CONFIG,
116         chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
117     properties.put(
118         ConsumerConfig.GROUP_ID_CONFIG,
119         "chatroom_channel");
120     return new KafkaConsumer<>(
121         properties,
122         stringDeserializer,
123         messageDeserializer);
124   }
125
126   @Bean
127   StringDeserializer stringDeserializer()
128   {
129     return new StringDeserializer();
130   }
131
132   @Bean
133   JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
134   {
135     JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
136     deserializer.configure(
137         Map.of(
138             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
139             JsonDeserializer.TYPE_MAPPINGS, typeMappings),
140         false );
141     return deserializer;
142   }
143
144   @Bean
145   String typeMappings ()
146   {
147     return
148         "command_create_chatroom:" +  CommandCreateChatRoomTo.class.getCanonicalName() + "," +
149         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
150   }
151
152   @Bean
153   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
154   {
155     Properties properties = new Properties();
156     properties.setProperty(
157         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
158         chatBackendProperties.getKafka().getBootstrapServers());
159     return properties;
160   }
161
162   @Bean
163   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
164   {
165     Properties properties = new Properties();
166     properties.setProperty(
167         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
168         chatBackendProperties.getKafka().getBootstrapServers());
169     properties.setProperty(
170         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
171         "false");
172     properties.setProperty(
173         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
174         "earliest");
175     return properties;
176   }
177
178   @Bean
179   ZoneId zoneId()
180   {
181     return ZoneId.systemDefault();
182   }
183 }