1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
7 import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.KafkaProducer;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.serialization.StringDeserializer;
15 import org.apache.kafka.common.serialization.StringSerializer;
16 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
17 import org.springframework.context.annotation.Bean;
18 import org.springframework.context.annotation.Configuration;
19 import org.springframework.kafka.support.serializer.JsonDeserializer;
20 import org.springframework.kafka.support.serializer.JsonSerializer;
22 import java.time.Clock;
23 import java.time.ZoneId;
24 import java.util.HashMap;
26 import java.util.Properties;
29 @ConditionalOnProperty(
30 prefix = "chat.backend",
32 havingValue = "kafka")
34 public class KafkaServicesConfiguration
37 ChatHomeService kafkaChatHome(
38 ChatBackendProperties properties,
39 ChatRoomChannel chatRoomChannel)
41 return new KafkaChatHomeService(
42 properties.getKafka().getNumPartitions(),
47 ChatRoomChannel chatRoomChannel(
48 ChatBackendProperties properties,
49 Producer<String, AbstractMessageTo> chatRoomChannelProducer,
50 Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
54 return new ChatRoomChannel(
55 properties.getKafka().getChatRoomChannelTopic(),
56 chatRoomChannelProducer,
57 chatRoomChannelConsumer,
59 properties.getKafka().getNumPartitions(),
60 properties.getChatroomBufferSize(),
65 Producer<String, AbstractMessageTo> chatRoomChannelProducer(
66 Properties defaultProducerProperties,
67 ChatBackendProperties chatBackendProperties,
68 StringSerializer stringSerializer,
69 JsonSerializer<AbstractMessageTo> messageSerializer)
71 Map<String, Object> properties = new HashMap<>();
72 defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
74 ProducerConfig.CLIENT_ID_CONFIG,
75 chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
76 return new KafkaProducer<>(
83 StringSerializer stringSerializer()
85 return new StringSerializer();
89 JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
91 JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
94 JsonSerializer.TYPE_MAPPINGS, typeMappings),
100 Consumer<String, AbstractMessageTo> chatRoomChannelConsumer(
101 Properties defaultConsumerProperties,
102 ChatBackendProperties chatBackendProperties,
103 StringDeserializer stringDeserializer,
104 JsonDeserializer<AbstractMessageTo> messageDeserializer)
106 Map<String, Object> properties = new HashMap<>();
107 defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
109 ConsumerConfig.CLIENT_ID_CONFIG,
110 chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
112 ConsumerConfig.GROUP_ID_CONFIG,
114 return new KafkaConsumer<>(
117 messageDeserializer);
121 StringDeserializer stringDeserializer()
123 return new StringDeserializer();
127 JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
129 JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
130 deserializer.configure(
132 JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
133 JsonDeserializer.TYPE_MAPPINGS, typeMappings),
139 String typeMappings ()
142 "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," +
143 "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
147 Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
149 Properties properties = new Properties();
150 properties.setProperty(
151 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
152 chatBackendProperties.getKafka().getBootstrapServers());
157 Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
159 Properties properties = new Properties();
160 properties.setProperty(
161 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
162 chatBackendProperties.getKafka().getBootstrapServers());
163 properties.setProperty(
164 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
166 properties.setProperty(
167 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
175 return ZoneId.systemDefault();