package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
+import jakarta.annotation.PreDestroy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.time.Clock;
+import java.time.ZoneId;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
@ConditionalOnProperty(
prefix = "chat.backend",
name = "services",
- havingValue = "inmemory",
- matchIfMissing = true)
+ havingValue = "kafka")
@Configuration
+@Slf4j
public class KafkaServicesConfiguration implements ApplicationRunner
{
+ @Autowired
+ ThreadPoolTaskExecutor taskExecutor;
+ @Autowired
+ ConfigurableApplicationContext context;
+
+ @Autowired
+ ChatMessageChannel chatMessageChannel;
+ @Autowired
+ ChatRoomChannel chatRoomChannel;
+
+ CompletableFuture<Void> chatRoomChannelConsumerJob;
+ CompletableFuture<Void> chatMessageChannelConsumerJob;
+
+
+ @Override
+ public void run(ApplicationArguments args) throws Exception
+ {
+ log.info("Starting the consumer for the ChatRoomChannel");
+ chatRoomChannelConsumerJob = taskExecutor
+ .submitCompletable(chatRoomChannel)
+ .exceptionally(e ->
+ {
+ log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
+ return null;
+ });
+ log.info("Starting the consumer for the ChatMessageChannel");
+ chatMessageChannelConsumerJob = taskExecutor
+ .submitCompletable(chatMessageChannel)
+ .exceptionally(e ->
+ {
+ log.error("The consumer for the ChatMessageChannel exited abnormally!", e);
+ return null;
+ });
+ }
+
+ @PreDestroy
+ public void joinChatRoomChannelConsumerJob()
+ {
+ log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+ chatRoomChannelConsumerJob.join();
+ log.info("Joined the consumer of the ChatRoomChannel");
+ }
+
+ @PreDestroy
+ public void joinChatMessageChannelConsumerJob()
+ {
+ log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
+ chatMessageChannelConsumerJob.join();
+ log.info("Joined the consumer of the ChatMessageChannel");
+ }
+
+
@Bean
ChatHome kafkaChatHome(
- ChatBackendProperties properties,
- InMemoryChatHomeService chatHomeService,
- StorageStrategy storageStrategy)
+ ShardingStrategy shardingStrategy,
+ ChatMessageChannel chatMessageChannel)
{
- int numShards = properties.getInmemory().getNumShards();
- SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
- ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
- return new ShardedChatHome(chatHomes, strategy);
+ return new KafkaChatHome(shardingStrategy, chatMessageChannel);
}
@Bean
- KafkaChatHomeService kafkaChatHomeService(ChatBackendProperties properties)
+ KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
{
- ShardingStrategyType sharding =
- properties.getInmemory().getShardingStrategy();
- int numShards = sharding == ShardingStrategyType.none
- ? 1
- : properties.getInmemory().getNumShards();
- int[] ownedShards = sharding == ShardingStrategyType.none
- ? new int[] { 0 }
- : properties.getInmemory().getOwnedShards();
- return new InMemoryChatHomeService(
- numShards,
- ownedShards,
- storageStrategy.read());
+ return new KafkaChatRoomFactory(chatRoomChannel);
}
@Bean
- InMemoryChatRoomFactory chatRoomFactory(
- InMemoryChatHomeService service,
- ShardingStrategy strategy,
- Clock clock,
- ChatBackendProperties properties)
+ ChatRoomChannel chatRoomChannel(
+ ChatBackendProperties properties,
+ Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
+ Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
+ ShardingStrategy shardingStrategy,
+ ChatMessageChannel chatMessageChannel,
+ Clock clock)
{
- return new InMemoryChatRoomFactory(
- service,
- strategy,
+ return new ChatRoomChannel(
+ properties.getKafka().getTopic(),
+ chatRoomChannelProducer,
+ chatRoomChannelConsumer,
+ shardingStrategy,
+ chatMessageChannel,
clock,
properties.getChatroomBufferSize());
}
@Bean
- ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
+ Producer<Integer, ChatRoomTo> chatRoomChannelProducer(
+ Properties defaultProducerProperties,
+ IntegerSerializer integerSerializer,
+ JsonSerializer<ChatRoomTo> chatRoomSerializer)
{
- return new KafkaLikeShardingStrategy(
+ return new KafkaProducer<>(
+ defaultProducerProperties,
+ integerSerializer,
+ chatRoomSerializer);
+ }
+
+ @Bean
+ IntegerSerializer integerSerializer()
+ {
+ return new IntegerSerializer();
+ }
+
+ @Bean
+ JsonSerializer<ChatRoomTo> chatRoomSerializer()
+ {
+ JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
+ return serializer;
+ }
+
+ @Bean
+ Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer(
+ Properties defaultConsumerProperties,
+ IntegerDeserializer integerDeserializer,
+ JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
+ {
+ Properties properties = new Properties(defaultConsumerProperties);
+ properties.setProperty(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ "chat_room_channel");
+ return new KafkaConsumer<>(
+ properties,
+ integerDeserializer,
+ chatRoomDeserializer);
+ }
+
+ @Bean
+ IntegerDeserializer integerDeserializer()
+ {
+ return new IntegerDeserializer();
+ }
+
+ @Bean
+ JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
+ {
+ JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
+ return deserializer;
+ }
+
+ @Bean
+ ShardingStrategy shardingStrategy(ChatBackendProperties properties)
+ {
+ return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
+ }
+
+ @Bean
+ ChatMessageChannel chatMessageChannel(
+ ChatBackendProperties properties,
+ Producer<String, MessageTo> chatMessageChannelProducer,
+ Consumer<String, MessageTo> chatMessageChannelConsumer,
+ ZoneId zoneId)
+ {
+ return new ChatMessageChannel(
+ properties.getKafka().getTopic(),
+ chatMessageChannelProducer,
+ chatMessageChannelConsumer,
+ zoneId,
properties.getKafka().getNumPartitions());
}
+
+ @Bean
+ Producer<String, MessageTo> chatMessageChannelProducer(
+ Properties defaultProducerProperties,
+ StringSerializer stringSerializer,
+ JsonSerializer<MessageTo> messageSerializer)
+ {
+ return new KafkaProducer<>(
+ defaultProducerProperties,
+ stringSerializer,
+ messageSerializer);
+ }
+
+ @Bean
+ StringSerializer stringSerializer()
+ {
+ return new StringSerializer();
+ }
+
+ @Bean
+ JsonSerializer<MessageTo> chatMessageSerializer()
+ {
+ JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
+ return serializer;
+ }
+
+ @Bean
+ Consumer<String, MessageTo> chatMessageChannelConsumer(
+ Properties defaultConsumerProperties,
+ StringDeserializer stringDeserializer,
+ JsonDeserializer<MessageTo> messageDeserializer)
+ {
+ Properties properties = new Properties(defaultConsumerProperties);
+ properties.setProperty(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ "chat_message_channel");
+ return new KafkaConsumer<>(
+ properties,
+ stringDeserializer,
+ messageDeserializer);
+ }
+
+ @Bean
+ StringDeserializer stringDeserializer()
+ {
+ return new StringDeserializer();
+ }
+
+ @Bean
+ JsonDeserializer<MessageTo> chatMessageDeserializer()
+ {
+ JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
+ return deserializer;
+ }
+
+ @Bean
+ Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
+ {
+ Properties properties = new Properties();
+ properties.setProperty(
+ ProducerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientId());
+ properties.setProperty(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ chatBackendProperties.getKafka().getBootstrapServers());
+ return properties;
+ }
+
+ @Bean
+ Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
+ {
+ Properties properties = new Properties();
+ properties.setProperty(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ chatBackendProperties.getKafka().getBootstrapServers());
+ properties.setProperty(
+ ConsumerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientId());
+ properties.setProperty(
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+ "false");
+ properties.setProperty(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ "earliest");
+ return properties;
+ }
+
+ @Bean
+ ZoneId zoneId()
+ {
+ return ZoneId.systemDefault();
+ }
}