1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHome;
5 import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
7 import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.serialization.StringDeserializer;
15 import org.apache.kafka.common.serialization.StringSerializer;
16 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
17 import org.springframework.context.annotation.Bean;
18 import org.springframework.context.annotation.Configuration;
19 import org.springframework.kafka.support.serializer.JsonDeserializer;
20 import org.springframework.kafka.support.serializer.JsonSerializer;
22 import java.time.Clock;
23 import java.time.ZoneId;
24 import java.util.HashMap;
26 import java.util.Properties;
29 @ConditionalOnProperty(
30 prefix = "chat.backend",
32 havingValue = "kafka")
34 public class KafkaServicesConfiguration
37 ChatHome kafkaChatHome(
38 ChatBackendProperties properties,
39 ChatRoomChannel chatRoomChannel)
41 return new KafkaChatHome(
42 properties.getKafka().getNumPartitions(),
47 KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
49 return new KafkaChatRoomFactory(chatRoomChannel);
53 ChatRoomChannel chatRoomChannel(
54 ChatBackendProperties properties,
55 Producer<String, AbstractMessageTo> chatRoomChannelProducer,
56 Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
60 return new ChatRoomChannel(
61 properties.getKafka().getChatRoomChannelTopic(),
62 chatRoomChannelProducer,
63 chatRoomChannelConsumer,
65 properties.getKafka().getNumPartitions(),
66 properties.getChatroomBufferSize(),
71 Producer<String, AbstractMessageTo> chatRoomChannelProducer(
72 Properties defaultProducerProperties,
73 ChatBackendProperties chatBackendProperties,
74 StringSerializer stringSerializer,
75 JsonSerializer<AbstractMessageTo> messageSerializer)
77 Map<String, Object> properties = new HashMap<>();
78 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
80 ProducerConfig.CLIENT_ID_CONFIG,
81 chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
82 return new KafkaProducer<>(
89 StringSerializer stringSerializer()
91 return new StringSerializer();
95 JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
97 JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
100 JsonSerializer.TYPE_MAPPINGS, typeMappings),
106 Consumer<String, AbstractMessageTo> chatRoomChannelConsumer(
107 Properties defaultConsumerProperties,
108 ChatBackendProperties chatBackendProperties,
109 StringDeserializer stringDeserializer,
110 JsonDeserializer<AbstractMessageTo> messageDeserializer)
112 Map<String, Object> properties = new HashMap<>();
113 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
115 ConsumerConfig.CLIENT_ID_CONFIG,
116 chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
118 ConsumerConfig.GROUP_ID_CONFIG,
120 return new KafkaConsumer<>(
123 messageDeserializer);
127 StringDeserializer stringDeserializer()
129 return new StringDeserializer();
133 JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
135 JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
136 deserializer.configure(
138 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
139 JsonDeserializer.TYPE_MAPPINGS, typeMappings),
145 String typeMappings ()
148 "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," +
149 "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
153 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
155 Properties properties = new Properties();
156 properties.setProperty(
157 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
158 chatBackendProperties.getKafka().getBootstrapServers());
163 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
165 Properties properties = new Properties();
166 properties.setProperty(
167 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
168 chatBackendProperties.getKafka().getBootstrapServers());
169 properties.setProperty(
170 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
172 properties.setProperty(
173 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
181 return ZoneId.systemDefault();