NG
authorKai Moritz <kai@juplo.de>
Sat, 22 Apr 2023 09:51:48 +0000 (11:51 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 22 Apr 2023 09:51:48 +0000 (11:51 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractTo.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestTo.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoTo.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoToTest.java [new file with mode: 0644]

index 216ff2e..5c08aa2 100644 (file)
@@ -8,7 +8,11 @@ import lombok.RequiredArgsConstructor;
 @RequiredArgsConstructor
 public class AbstractTo
 {
-  public enum ToType { MESSAGE_SENT, CREATE_CHATROOM_REQUEST };
+  public enum ToType {
+    CREATE_CHATROOM_REQUEST,
+    MESSAGE_SENT,
+    CHATROOM_INFO
+  }
 
   @Getter
   private final ToType type;
index 8294316..8a53d3c 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
 import lombok.Getter;
@@ -67,7 +68,44 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
   }
 
 
-  Mono<Message> sendMessage(
+
+  Mono<ChatRoomInfo> sendCreateChatRoomRequest(
+      UUID chatRoomId,
+      String name)
+  {
+    CreateChatRoomRequestTo createChatRoomRequestTo = CreateChatRoomRequestTo.of(name);
+    return Mono.create(sink ->
+    {
+      ProducerRecord<String, CreateChatRoomRequestTo> record =
+          new ProducerRecord<>(
+              topic,
+              chatRoomId.toString(),
+              createChatRoomRequestTo);
+
+      producer.send(record, ((metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
+          ChatRoomInfo chatRoomInfo = ChatRoomInfo.of(chatRoomId, name, record.partition());
+          createChatRoom(chatRoomInfo);
+          sink.success(chatRoomInfo);
+        }
+        else
+        {
+          // On send-failure
+          log.error(
+              "Could not send create-request for chat room (id={}, name={}): {}",
+              chatRoomId,
+              name,
+              exception);
+          sink.error(exception);
+        }
+      }));
+    });
+  }
+
+  Mono<Message> sendChatMessage(
       UUID chatRoomId,
       Message.MessageKey key,
       LocalDateTime timestamp,
@@ -202,14 +240,18 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
   {
     for (ConsumerRecord<String, AbstractTo> record : records)
     {
+      UUID chatRoomId = UUID.fromString(record.key());
+
       switch (record.value().getType())
       {
         case CREATE_CHATROOM_REQUEST:
-          createChatRoom((CreateChatRoomRequestTo) record.value());
+          createChatRoom(
+              chatRoomId,
+              (CreateChatRoomRequestTo) record.value(),
+              record.partition());
           break;
 
         case MESSAGE_SENT:
-          UUID chatRoomId = UUID.fromString(record.key());
           Instant instant = Instant.ofEpochSecond(record.timestamp());
           LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
           loadChatMessage(
@@ -226,10 +268,26 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
   }
 
   void createChatRoom(
+      UUID chatRoomId,
       CreateChatRoomRequestTo createChatRoomRequestTo,
       int partition)
   {
-    chatrooms[partition].put
+    putChatRoom(ChatRoomInfo.of(
+        chatRoomId,
+        createChatRoomRequestTo.getName(),
+        partition));
+  }
+
+
+  void createChatRoom(ChatRoomInfo chatRoomInfo)
+  {
+    UUID id = chatRoomInfo.getId();
+    String name = chatRoomInfo.getName();
+    int shard = chatRoomInfo.getShard();
+    log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
+    KafkaChatRoomService service = new KafkaChatRoomService(this, id);
+    ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
+    putChatRoom(chatRoom);
   }
 
   void loadChatMessage(
@@ -267,7 +325,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
   }
 
 
-  void putChatRoom(ChatRoom chatRoom)
+  private void putChatRoom(ChatRoom chatRoom)
   {
     Integer partition = chatRoom.getShard();
     UUID chatRoomId = chatRoom.getId();
index 57deaf2..8bbc82e 100644 (file)
@@ -32,41 +32,6 @@ public class ChatRoomChannel implements Runnable
   private boolean running;
 
 
-  Mono<ChatRoomInfo> sendCreateChatRoomRequest(
-      UUID chatRoomId,
-      String name)
-  {
-    int shard = this.shardingStrategy.selectShard(chatRoomId);
-    CreateChatRoomRequestTo createChatRoomRequestTo = CreateChatRoomRequestTo.of(chatRoomId.toString(), name, shard);
-    return Mono.create(sink ->
-    {
-      ProducerRecord<Integer, CreateChatRoomRequestTo> record =
-          new ProducerRecord<>(
-              topic,
-              shard,
-              createChatRoomRequestTo);
-
-      producer.send(record, ((metadata, exception) ->
-      {
-        if (metadata != null)
-        {
-          log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
-          sink.success(createChatRoomRequestTo.toChatRoomInfo());
-        }
-        else
-        {
-          // On send-failure
-          log.error(
-              "Could not send create-request for chat room (id={}, name={}): {}",
-              chatRoomId,
-              name,
-              exception);
-          sink.error(exception);
-        }
-      }));
-    });
-  }
-
   @Override
   public void run()
   {
index eb57392..b6ad38f 100644 (file)
@@ -10,9 +10,7 @@ import java.util.UUID;
 @Data
 public class CreateChatRoomRequestTo extends AbstractTo
 {
-  private String id;
   private String name;
-  private int shard;
 
 
   public CreateChatRoomRequestTo()
@@ -21,22 +19,10 @@ public class CreateChatRoomRequestTo extends AbstractTo
   }
 
 
-  public ChatRoomInfo toChatRoomInfo()
-  {
-    return new ChatRoomInfo(UUID.fromString(id), name, shard);
-  }
-
-  public static CreateChatRoomRequestTo from(ChatRoom chatRoom)
-  {
-    return CreateChatRoomRequestTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard());
-  }
-
-  public static CreateChatRoomRequestTo of(String id,  String name, int shard)
+  public static CreateChatRoomRequestTo of(String name)
   {
     CreateChatRoomRequestTo to = new CreateChatRoomRequestTo();
-    to.id = id;
     to.name = name;
-    to.shard = shard;
     return to;
   }
 }
index 0986194..f802234 100644 (file)
@@ -28,7 +28,7 @@ public class KafkaChatRoomService implements ChatRoomService
     String text)
   {
     return chatMessageChannel
-        .sendMessage(chatRoomId, key, timestamp, text)
+        .sendChatMessage(chatRoomId, key, timestamp, text)
         .doOnSuccess(message -> persistMessage(message));
   }
 
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoTo.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoTo.java
new file mode 100644 (file)
index 0000000..f232c78
--- /dev/null
@@ -0,0 +1,42 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import lombok.Data;
+
+import java.util.UUID;
+
+
+@Data
+public class ChatRoomInfoTo extends AbstractTo
+{
+  private String id;
+  private String name;
+  private int shard;
+
+
+  public ChatRoomInfoTo()
+  {
+    super(ToType.CHATROOM_INFO);
+  }
+
+
+  public ChatRoomInfo toChatRoomInfo()
+  {
+    return new ChatRoomInfo(UUID.fromString(id), name, shard);
+  }
+
+  public static ChatRoomInfoTo from(ChatRoom chatRoom)
+  {
+    return ChatRoomInfoTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard());
+  }
+
+  public static ChatRoomInfoTo of(String id, String name, int shard)
+  {
+    ChatRoomInfoTo to = new ChatRoomInfoTo();
+    to.id = id;
+    to.name = name;
+    to.shard = shard;
+    return to;
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoToTest.java
new file mode 100644 (file)
index 0000000..6132eac
--- /dev/null
@@ -0,0 +1,39 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class ChatRoomInfoToTest
+{
+  final String json = """
+  {
+    "id": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
+    "name": "Foo-Room!",
+    "shard": 666
+  }""";
+
+  ObjectMapper mapper;
+
+  @BeforeEach
+  public void setUp()
+  {
+    mapper = new ObjectMapper();
+    mapper.registerModule(new JavaTimeModule());
+    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+  }
+
+  @Test
+  public void testDeserialization() throws Exception
+  {
+    ChatRoomInfoTo message = mapper.readValue(json, ChatRoomInfoTo.class);
+    assertThat(message.getId()).isEqualTo("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+    assertThat(message.getName()).isEqualTo("Foo-Room!");
+    assertThat(message.getShard()).isEqualTo(666);
+  }
+}