refactor: Moved exceptions into package `exceptions` - Aligned Code
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHome;
5 import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
7 import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.serialization.StringDeserializer;
15 import org.apache.kafka.common.serialization.StringSerializer;
16 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
17 import org.springframework.context.annotation.Bean;
18 import org.springframework.context.annotation.Configuration;
19 import org.springframework.kafka.support.serializer.JsonDeserializer;
20 import org.springframework.kafka.support.serializer.JsonSerializer;
21
22 import java.time.Clock;
23 import java.time.ZoneId;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.Properties;
27
28
29 @ConditionalOnProperty(
30     prefix = "chat.backend",
31     name = "services",
32     havingValue = "kafka")
33 @Configuration
34 public class KafkaServicesConfiguration
35 {
36   @Bean
37   ChatHome kafkaChatHome(
38       ChatBackendProperties properties,
39       ChatRoomChannel chatRoomChannel)
40   {
41     return new KafkaChatHome(
42         properties.getKafka().getNumPartitions(),
43         chatRoomChannel);
44   }
45
46   @Bean
47   ChatRoomChannel chatRoomChannel(
48       ChatBackendProperties properties,
49       Producer<String, AbstractMessageTo> chatRoomChannelProducer,
50       Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
51       ZoneId zoneId,
52       Clock clock)
53   {
54     return new ChatRoomChannel(
55         properties.getKafka().getChatRoomChannelTopic(),
56         chatRoomChannelProducer,
57         chatRoomChannelConsumer,
58         zoneId,
59         properties.getKafka().getNumPartitions(),
60         properties.getChatroomBufferSize(),
61         clock);
62   }
63
64   @Bean
65   Producer<String, AbstractMessageTo>  chatRoomChannelProducer(
66       Properties defaultProducerProperties,
67       ChatBackendProperties chatBackendProperties,
68       StringSerializer stringSerializer,
69       JsonSerializer<AbstractMessageTo> messageSerializer)
70   {
71     Map<String, Object> properties = new HashMap<>();
72     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
73     properties.put(
74         ProducerConfig.CLIENT_ID_CONFIG,
75         chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
76     return new KafkaProducer<>(
77         properties,
78         stringSerializer,
79         messageSerializer);
80   }
81
82   @Bean
83   StringSerializer stringSerializer()
84   {
85     return new StringSerializer();
86   }
87
88   @Bean
89   JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
90   {
91     JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
92     serializer.configure(
93         Map.of(
94             JsonSerializer.TYPE_MAPPINGS, typeMappings),
95         false);
96     return serializer;
97   }
98
99   @Bean
100   Consumer<String, AbstractMessageTo>  chatRoomChannelConsumer(
101       Properties defaultConsumerProperties,
102       ChatBackendProperties chatBackendProperties,
103       StringDeserializer stringDeserializer,
104       JsonDeserializer<AbstractMessageTo> messageDeserializer)
105   {
106     Map<String, Object> properties = new HashMap<>();
107     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
108     properties.put(
109         ConsumerConfig.CLIENT_ID_CONFIG,
110         chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
111     properties.put(
112         ConsumerConfig.GROUP_ID_CONFIG,
113         "chatroom_channel");
114     return new KafkaConsumer<>(
115         properties,
116         stringDeserializer,
117         messageDeserializer);
118   }
119
120   @Bean
121   StringDeserializer stringDeserializer()
122   {
123     return new StringDeserializer();
124   }
125
126   @Bean
127   JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
128   {
129     JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
130     deserializer.configure(
131         Map.of(
132             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
133             JsonDeserializer.TYPE_MAPPINGS, typeMappings),
134         false );
135     return deserializer;
136   }
137
138   @Bean
139   String typeMappings ()
140   {
141     return
142         "command_create_chatroom:" +  CommandCreateChatRoomTo.class.getCanonicalName() + "," +
143         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
144   }
145
146   @Bean
147   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
148   {
149     Properties properties = new Properties();
150     properties.setProperty(
151         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
152         chatBackendProperties.getKafka().getBootstrapServers());
153     return properties;
154   }
155
156   @Bean
157   Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
158   {
159     Properties properties = new Properties();
160     properties.setProperty(
161         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
162         chatBackendProperties.getKafka().getBootstrapServers());
163     properties.setProperty(
164         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
165         "false");
166     properties.setProperty(
167         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
168         "earliest");
169     return properties;
170   }
171
172   @Bean
173   ZoneId zoneId()
174   {
175     return ZoneId.systemDefault();
176   }
177 }