WIP:ALIGN
authorKai Moritz <kai@juplo.de>
Tue, 12 Sep 2023 17:47:35 +0000 (19:47 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 13 Sep 2023 20:44:25 +0000 (22:44 +0200)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java

index 4c9abd1..8598464 100644 (file)
@@ -212,13 +212,6 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
       switch (record.value().getType())
       {
-        case COMMAND_CREATE_CHATROOM:
-          createChatRoom(
-              chatRoomId,
-              (CommandCreateChatRoomTo) record.value(),
-              record.partition());
-          break;
-
         case EVENT_CHATMESSAGE_RECEIVED:
           Instant instant = Instant.ofEpochSecond(record.timestamp());
           LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
@@ -242,31 +235,14 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     }
   }
 
-  private void createChatRoom(
-      UUID chatRoomId,
-      CommandCreateChatRoomTo createChatRoomRequestTo,
-      Integer partition)
+  void createChatRoom(ChatRoomInfo chatRoomInfo)
   {
-    log.info(
-        "Loading ChatRoom {} for shard {} with buffer-size {}",
-        chatRoomId,
-        partition,
-        bufferSize);
-    KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
-    ChatRoomData chatRoomData = new ChatRoomData(
-        clock,
-        service,
-        bufferSize);
-    putChatRoom(
-        chatRoomId,
-        createChatRoomRequestTo.getName(),
-        partition,
-        chatRoomData);
-  }
-
+    if (!isShardOwned[chatRoomInfo.getShard()])
+    {
+      log.debug("Ignoring not owned chat-room {}", chatRoomInfo);
+      return;
+    }
 
-  private void createChatRoom(ChatRoomInfo chatRoomInfo)
-  {
     UUID id = chatRoomInfo.getId();
     log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
     KafkaChatMessageService service = new KafkaChatMessageService(this, id);
index 77f0cbb..97739b5 100644 (file)
@@ -8,6 +8,7 @@ import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -29,8 +30,7 @@ import java.util.stream.IntStream;
 @Slf4j
 public class InfoChannel implements Runnable
 {
-  private final String infoTopic;
-  private final String dataTopic;
+  private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
   private final Consumer<String, AbstractMessageTo> consumer;
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
@@ -39,16 +39,14 @@ public class InfoChannel implements Runnable
 
 
   public InfoChannel(
-    String infoTopic,
-    String dataTopic,
+    String topic,
     Producer<String, AbstractMessageTo> producer,
     Consumer<String, AbstractMessageTo> infoChannelConsumer)
   {
     log.debug(
         "Creating InfoChannel for topic {}",
-        infoTopic);
-    this.infoTopic = infoTopic;
-    this.dataTopic = dataTopic;
+        topic);
+    this.topic = topic;
     this.consumer = infoChannelConsumer;
     this.producer = producer;
     this.chatRoomInfo = new HashMap<>();
@@ -56,16 +54,16 @@ public class InfoChannel implements Runnable
 
 
 
-  Mono<ChatRoomInfo> sendCreateChatRoomRequest(
+  Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
       UUID chatRoomId,
       String name)
   {
-    CommandCreateChatRoomTo to = CommandCreateChatRoomTo.of(name);
+    EventChatRoomCreated to = EventChatRoomCreated.of(name);
     return Mono.create(sink ->
     {
       ProducerRecord<String, AbstractMessageTo> record =
           new ProducerRecord<>(
-              dataTopic,
+              topic<,
               chatRoomId.toString(),
               to);
 
@@ -135,15 +133,15 @@ public class InfoChannel implements Runnable
     log.info("Exiting normally");
   }
 
-  private void loadChatRoom(ConsumerRecords<String, AbstractInfoMessageTo> records)
+  private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
   {
-    for (ConsumerRecord<String, AbstractInfoMessageTo> record : records)
+    for (ConsumerRecord<String, AbstractMessageTo> record : records)
     {
       UUID chatRoomId = UUID.fromString(record.key());
 
       switch (record.value().getType())
       {
-        case COMMAND_CREATE_CHATROOM:
+        case EVENT_CHATROOM_CREATED:
           createChatRoom(
               chatRoomId,
               (CommandCreateChatRoomTo) record.value(),
@@ -239,7 +237,7 @@ public class InfoChannel implements Runnable
     consumer.pause(IntStream
         .range(0, numShards)
         .filter(shard -> isShardOwned[shard])
-        .mapToObj(shard -> new TopicPartition(infoTopic, shard))
+        .mapToObj(shard -> new TopicPartition(topic, shard))
         .toList());
   }
 
index 73990e6..707e843 100644 (file)
@@ -27,7 +27,7 @@ public class KafkaChatHomeService implements ChatHomeService
   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
   {
     log.info("Sending create-command for chat rooom: id={}, name={}");
-    return infoChannel.sendCreateChatRoomRequest(id, name);
+    return infoChannel.sendChatRoomCreatedEvent(id, name);
   }
 
   @Override