NEU
authorKai Moritz <kai@juplo.de>
Wed, 19 Apr 2023 15:18:57 +0000 (17:18 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 19 Apr 2023 15:18:57 +0000 (17:18 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java

index 7620461..230f822 100644 (file)
@@ -249,7 +249,37 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
 
   void putChatRoom(ChatRoom chatRoom)
   {
-    chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+    Integer partition = chatRoom.getShard();
+    UUID chatRoomId = chatRoom.getId();
+    ChatRoom existingChatRoom = chatrooms[partition].get(chatRoomId);
+    if (existingChatRoom == null)
+    {
+      log.info(
+          "Creating new chat-room in partition {}: {}",
+          partition,
+          chatRoom);
+      chatrooms[partition].put(chatRoomId, chatRoom);
+    }
+    else
+    {
+      if (chatRoom.getShard() != existingChatRoom.getShard())
+      {
+        throw new IllegalArgumentException(
+            "Could not change the shard of existing chat-room " +
+            chatRoomId + " from " +
+            existingChatRoom.getShard() + " to " +
+            chatRoom.getShard());
+      }
+      else
+      {
+        log.info(
+            "Updating chat-room in partition {}: {} -> {}",
+            partition,
+            existingChatRoom,
+            chatRoom);
+        existingChatRoom.s
+      }
+    }
   }
 
   Mono<ChatRoom> getChatRoom(int shard, UUID id)
index da04554..b08a50d 100644 (file)
@@ -23,9 +23,10 @@ import java.util.UUID;
 public class ChatRoomChannel implements Runnable
 {
   private final String topic;
-  private final Consumer<String, ChatRoomTo> consumer;
-  private final Producer<String, ChatRoomTo> producer;
-  private final ChatRoomFactory chatRoomFactory;
+  private final Consumer<Integer, ChatRoomTo> consumer;
+  private final Producer<Integer, ChatRoomTo> producer;
+  private final ShardingStrategy shardingStrategy;
+  private final ChatMessageChannel chatMessageChannel;
 
   private boolean running;
 
@@ -41,10 +42,10 @@ public class ChatRoomChannel implements Runnable
     {
       try
       {
-        ConsumerRecords<String, ChatRoomTo> records = consumer.poll(Duration.ofMinutes(5));
+        ConsumerRecords<Integer, ChatRoomTo> records = consumer.poll(Duration.ofMinutes(5));
         log.info("Fetched {} messages", records.count());
 
-        for (ConsumerRecord<String, ChatRoomTo> record : records)
+        for (ConsumerRecord<Integer, ChatRoomTo> record : records)
         {
           UUID id = record.value().getId();
           String name = record.value().getName();
@@ -60,17 +61,27 @@ public class ChatRoomChannel implements Runnable
     }
   }
 
+  void createChatRoom()
+  {
+    log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+    KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id);
+    int shard = shardingStrategy.selectShard(id);
+    ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
+    chatMessageChannel.putChatRoom(chatRoom);
+  }
+
   Mono<ChatRoomInfo> sendCreateChatRoomRequest(
       UUID chatRoomId,
       String name)
   {
-    ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId, name);
+    int shard = this.shardingStrategy.selectShard(chatRoomId);
+    ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId, name, shard);
     return Mono.create(sink ->
     {
-      ProducerRecord<String, ChatRoomTo> record =
+      ProducerRecord<Integer, ChatRoomTo> record =
           new ProducerRecord<>(
               topic,
-              chatRoomId.toString(),
+              shard,
               chatRoomTo);
 
       producer.send(record, ((metadata, exception) ->
index 4b85f1c..9c196b2 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -15,9 +16,15 @@ public class ChatRoomTo
 {
   private UUID id;
   private String name;
+  private int shard;
 
   public ChatRoomInfo toChatRoomInfo()
   {
-    return new ChatRoomInfo(id, name, -1);
+    return new ChatRoomInfo(id, name, shard);
+  }
+
+  public static ChatRoomTo from(ChatRoom chatRoom)
+  {
+    return ChatRoomTo.of(chatRoom.getId(), chatRoom.getName(), chatRoom.getShard());
   }
 }
index ad9fe9f..825f16e 100644 (file)
@@ -1,14 +1,11 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Mono;
 
-import java.time.Clock;
 import java.util.UUID;
 
 
@@ -16,19 +13,12 @@ import java.util.UUID;
 @Slf4j
 public class KafkaChatRoomFactory implements ChatRoomFactory
 {
-  private final int bufferSize;
-  private final ShardingStrategy shardingStrategy;
-  private final Clock clock;
-  private final ChatMessageChannel chatMessageChannel;
+  private final ChatRoomChannel chatRoomChannel;
 
   @Override
   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
   {
-    log.info("Creating ChatRoom with buffer-size {}", bufferSize);
-    KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id);
-    int shard = shardingStrategy.selectShard(id);
-    ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
-    chatMessageChannel.putChatRoom(chatRoom);
-    return Mono.just(chatRoom);
+    log.info("Sending create-request for chat rooom: id={}, name={}");
+    return chatRoomChannel.sendCreateChatRoomRequest(id, name);
   }
 }