NEU
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
index 91115ea..efc86b9 100644 (file)
@@ -1,21 +1,36 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
 import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
+import jakarta.annotation.PreDestroy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 import java.time.Clock;
+import java.time.ZoneId;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
 
 
 @ConditionalOnProperty(
@@ -23,65 +38,258 @@ import java.time.Clock;
     name = "services",
     havingValue = "kafka")
 @Configuration
+@Slf4j
 public class KafkaServicesConfiguration implements ApplicationRunner
 {
-  @Bean
-  ChatHome kafkaChatHome(
-      ChatBackendProperties properties,
-      KafkaChatHomeService chatHomeService)
-  {
-    int numShards = properties.getInmemory().getNumShards();
-    SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
-    for (int shard = 0; shard < numShards; shard++)
-    {
+  @Autowired
+  ThreadPoolTaskExecutor taskExecutor;
+  @Autowired
+  ConfigurableApplicationContext context;
+
+  @Autowired
+  ChatMessageChannel chatMessageChannel;
+  @Autowired
+  ChatRoomChannel chatRoomChannel;
+
+  CompletableFuture<Void> chatRoomChannelConsumerJob;
+  CompletableFuture<Void> chatMessageChannelConsumerJob;
 
-    }
-        .read()
-        .subscribe(chatRoom ->
+
+  @Override
+  public void run(ApplicationArguments args) throws Exception
+  {
+    log.info("Starting the consumer for the ChatRoomChannel");
+    chatRoomChannelConsumerJob = taskExecutor
+        .submitCompletable(chatRoomChannel)
+        .exceptionally(e ->
         {
-          int shard = chatRoom.getShard();
-          if (chatHomes[shard] == null)
-            chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
+          log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
+          return null;
         });
-    ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
-    return new ShardedChatHome(chatHomes, strategy);
+    log.info("Starting the consumer for the ChatMessageChannel");
+    chatMessageChannelConsumerJob = taskExecutor
+        .submitCompletable(chatMessageChannel)
+        .exceptionally(e ->
+        {
+          log.error("The consumer for the ChatMessageChannel exited abnormally!", e);
+          return null;
+        });
+  }
+
+  @PreDestroy
+  public void joinChatRoomChannelConsumerJob()
+  {
+    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+    chatRoomChannelConsumerJob.join();
+    log.info("Joined the consumer of the ChatRoomChannel");
+  }
+
+  @PreDestroy
+  public void joinChatMessageChannelConsumerJob()
+  {
+    log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
+    chatMessageChannelConsumerJob.join();
+    log.info("Joined the consumer of the ChatMessageChannel");
+  }
+
+
+  @Bean
+  ChatHome kafkaChatHome(
+      ShardingStrategy shardingStrategy,
+      ChatMessageChannel chatMessageChannel)
+  {
+    return new KafkaChatHome(shardingStrategy, chatMessageChannel);
   }
 
   @Bean
-  KafkaChatHomeService kafkaChatHomeService(ChatBackendProperties properties)
+  KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
   {
-    ShardingStrategyType sharding =
-        properties.getInmemory().getShardingStrategy();
-    int numShards = sharding == ShardingStrategyType.none
-        ? 1
-        : properties.getInmemory().getNumShards();
-    int[] ownedShards = sharding == ShardingStrategyType.none
-        ? new int[] { 0 }
-        : properties.getInmemory().getOwnedShards();
-    return new InMemoryChatHomeService(
-        numShards,
-        ownedShards,
-        storageStrategy.read());
+    return new KafkaChatRoomFactory(chatRoomChannel);
   }
 
   @Bean
-  InMemoryChatRoomFactory chatRoomFactory(
-      InMemoryChatHomeService service,
-      ShardingStrategy strategy,
-      Clock clock,
-      ChatBackendProperties properties)
+  ChatRoomChannel chatRoomChannel(
+      ChatBackendProperties properties,
+      Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
+      Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
+      ShardingStrategy shardingStrategy,
+      ChatMessageChannel chatMessageChannel,
+      Clock clock)
   {
-    return new InMemoryChatRoomFactory(
-        service,
-        strategy,
+    return new ChatRoomChannel(
+        properties.getKafka().getTopic(),
+        chatRoomChannelProducer,
+        chatRoomChannelConsumer,
+        shardingStrategy,
+        chatMessageChannel,
         clock,
         properties.getChatroomBufferSize());
   }
 
   @Bean
-  ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
+  Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
+      Properties defaultProducerProperties,
+      IntegerSerializer integerSerializer,
+      JsonSerializer<ChatRoomTo> chatRoomSerializer)
+  {
+    return new KafkaProducer<>(
+        defaultProducerProperties,
+        integerSerializer,
+        chatRoomSerializer);
+  }
+
+  @Bean
+  IntegerSerializer integerSerializer()
+  {
+    return new IntegerSerializer();
+  }
+
+  @Bean
+  JsonSerializer<ChatRoomTo> chatRoomSerializer()
+  {
+    JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
+    return serializer;
+  }
+
+  @Bean
+  Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
+      Properties defaultConsumerProperties,
+      IntegerDeserializer integerDeserializer,
+      JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
+  {
+    Properties properties = new Properties(defaultConsumerProperties);
+    properties.setProperty(
+        ConsumerConfig.GROUP_ID_CONFIG,
+        "chat_room_channel");
+    return new KafkaConsumer<>(
+        properties,
+        integerDeserializer,
+        chatRoomDeserializer);
+  }
+
+  @Bean
+  IntegerDeserializer integerDeserializer()
+  {
+    return new IntegerDeserializer();
+  }
+
+  @Bean
+  JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
+  {
+    JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
+    return deserializer;
+  }
+
+  @Bean
+  ShardingStrategy shardingStrategy(ChatBackendProperties properties)
   {
-    return new KafkaLikeShardingStrategy(
+    return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
+  }
+
+  @Bean
+  ChatMessageChannel chatMessageChannel(
+      ChatBackendProperties properties,
+      Producer<String, MessageTo> chatMessageChannelProducer,
+      Consumer<String, MessageTo> chatMessageChannelConsumer,
+      ZoneId zoneId)
+  {
+    return new ChatMessageChannel(
+        properties.getKafka().getTopic(),
+        chatMessageChannelProducer,
+        chatMessageChannelConsumer,
+        zoneId,
         properties.getKafka().getNumPartitions());
   }
+
+  @Bean
+  Producer<String, MessageTo>  chatMessageChannelProducer(
+      Properties defaultProducerProperties,
+      StringSerializer stringSerializer,
+      JsonSerializer<MessageTo> messageSerializer)
+  {
+    return new KafkaProducer<>(
+        defaultProducerProperties,
+        stringSerializer,
+        messageSerializer);
+  }
+
+  @Bean
+  StringSerializer stringSerializer()
+  {
+    return new StringSerializer();
+  }
+
+  @Bean
+  JsonSerializer<MessageTo> chatMessageSerializer()
+  {
+    JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
+    return serializer;
+  }
+
+  @Bean
+  Consumer<String, MessageTo>  chatMessageChannelConsumer(
+      Properties defaultConsumerProperties,
+      StringDeserializer stringDeserializer,
+      JsonDeserializer<MessageTo> messageDeserializer)
+  {
+    Properties properties = new Properties(defaultConsumerProperties);
+    properties.setProperty(
+        ConsumerConfig.GROUP_ID_CONFIG,
+        "chat_message_channel");
+    return new KafkaConsumer<>(
+        properties,
+        stringDeserializer,
+        messageDeserializer);
+  }
+
+  @Bean
+  StringDeserializer stringDeserializer()
+  {
+    return new StringDeserializer();
+  }
+
+  @Bean
+  JsonDeserializer<MessageTo> chatMessageDeserializer()
+  {
+    JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
+    return deserializer;
+  }
+
+  @Bean
+  Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
+  {
+    Properties properties = new Properties();
+    properties.setProperty(
+        ProducerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientId());
+    properties.setProperty(
+        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        chatBackendProperties.getKafka().getBootstrapServers());
+    return properties;
+  }
+
+  @Bean
+  Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
+  {
+    Properties properties = new Properties();
+    properties.setProperty(
+        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        chatBackendProperties.getKafka().getBootstrapServers());
+    properties.setProperty(
+        ConsumerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientId());
+    properties.setProperty(
+        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+        "false");
+    properties.setProperty(
+        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+        "earliest");
+    return properties;
+  }
+
+  @Bean
+  ZoneId zoneId()
+  {
+    return ZoneId.systemDefault();
+  }
 }