WIP
authorKai Moritz <kai@juplo.de>
Wed, 13 Sep 2023 18:54:27 +0000 (20:54 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 13 Sep 2023 20:46:02 +0000 (22:46 +0200)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java

index 018dc65..18611d4 100644 (file)
@@ -4,6 +4,7 @@ import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -12,6 +13,7 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -64,25 +66,41 @@ public class KafkaServicesConfiguration
   @Bean
   ChatHomeService kafkaChatHome(
       ChatBackendProperties properties,
+      InfoChannel infoChannel,
       DataChannel dataChannel)
   {
     return new KafkaChatHomeService(
         properties.getKafka().getNumPartitions(),
+        infoChannel,
         dataChannel);
   }
 
   @Bean
-  DataChannel chatRoomChannel(
+  InfoChannel infoChannel(
       ChatBackendProperties properties,
-      Producer<String, AbstractMessageTo> chatRoomChannelProducer,
-      Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
+      Producer<String, AbstractMessageTo> producer,
+      Consumer<String, AbstractMessageTo> infoChannelConsumer,
+      DataChannel dataChannel)
+  {
+    return new InfoChannel(
+        properties.getKafka().getInfoChannelTopic(),
+        producer,
+        infoChannelConsumer,
+        dataChannel);
+  }
+
+  @Bean
+  DataChannel dataChannel(
+      ChatBackendProperties properties,
+      Producer<String, AbstractMessageTo> producer,
+      Consumer<String, AbstractMessageTo> dataChannelConsumer,
       ZoneId zoneId,
       Clock clock)
   {
     return new DataChannel(
         properties.getKafka().getDataChannelTopic(),
-        chatRoomChannelProducer,
-        chatRoomChannelConsumer,
+        producer,
+        dataChannelConsumer,
         zoneId,
         properties.getKafka().getNumPartitions(),
         properties.getChatroomBufferSize(),
@@ -90,7 +108,7 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Producer<String, AbstractMessageTo>  chatRoomChannelProducer(
+  Producer<String, AbstractMessageTo>  producer(
       Properties defaultProducerProperties,
       ChatBackendProperties chatBackendProperties,
       StringSerializer stringSerializer,
@@ -100,7 +118,7 @@ public class KafkaServicesConfiguration
     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
     properties.put(
         ProducerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
     return new KafkaProducer<>(
         properties,
         stringSerializer,
@@ -125,7 +143,28 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Consumer<String, AbstractMessageTo>  chatRoomChannelConsumer(
+  Consumer<String, AbstractMessageTo>  infoChannelConsumer(
+      Properties defaultConsumerProperties,
+      ChatBackendProperties chatBackendProperties,
+      StringDeserializer stringDeserializer,
+      JsonDeserializer<AbstractMessageTo> messageDeserializer)
+  {
+    Map<String, Object> properties = new HashMap<>();
+    defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ConsumerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
+    properties.put(
+        ConsumerConfig.GROUP_ID_CONFIG,
+        "info_channel");
+    return new KafkaConsumer<>(
+        properties,
+        stringDeserializer,
+        messageDeserializer);
+  }
+
+  @Bean
+  Consumer<String, AbstractMessageTo>  dataChannelConsumer(
       Properties defaultConsumerProperties,
       ChatBackendProperties chatBackendProperties,
       StringDeserializer stringDeserializer,
@@ -135,10 +174,10 @@ public class KafkaServicesConfiguration
     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
     properties.put(
         ConsumerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
     properties.put(
         ConsumerConfig.GROUP_ID_CONFIG,
-        "chatroom_channel");
+        "data_channel");
     return new KafkaConsumer<>(
         properties,
         stringDeserializer,
@@ -167,7 +206,7 @@ public class KafkaServicesConfiguration
   String typeMappings ()
   {
     return
-        "command_create_chatroom:" +  CommandCreateChatRoomTo.class.getCanonicalName() + "," +
+        "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
   }