WIP
authorKai Moritz <kai@juplo.de>
Sun, 3 Sep 2023 09:15:19 +0000 (11:15 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 3 Sep 2023 09:15:19 +0000 (11:15 +0200)
src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java

index 339451a..f41f45f 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.api;
 
 import de.juplo.kafka.chat.backend.domain.ChatHome;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import lombok.RequiredArgsConstructor;
 import org.springframework.http.codec.ServerSentEvent;
@@ -18,7 +17,6 @@ import java.util.UUID;
 public class ChatBackendController
 {
   private final ChatHome chatHome;
-  private final ChatRoomFactory factory;
   private final StorageStrategy storageStrategy;
 
 
@@ -26,7 +24,7 @@ public class ChatBackendController
   public Mono<ChatRoomInfoTo> create(@RequestBody String name)
   {
     UUID chatRoomId = UUID.randomUUID();
-    return factory
+    return chatHome
         .createChatRoom(chatRoomId, name)
         .map(ChatRoomInfoTo::from);
   }
index 6091c0c..e4d92db 100644 (file)
@@ -8,6 +8,8 @@ import java.util.UUID;
 
 public interface ChatHome
 {
+  Mono<ChatRoomInfo> createChatRoom(UUID id, String name);
+
   Mono<ChatRoom> getChatRoom(UUID id);
 
   Flux<ChatRoom> getChatRooms();
index b946309..c66b887 100644 (file)
@@ -26,7 +26,7 @@ public class ChatRoom extends ChatRoomInfo
   public ChatRoom(
       UUID id,
       String name,
-      int shard,
+      Integer shard,
       Clock clock,
       ChatRoomService service,
       int bufferSize)
diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java
deleted file mode 100644 (file)
index 603795d..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka.chat.backend.domain;
-
-import reactor.core.publisher.Mono;
-
-import java.util.UUID;
-
-
-public interface ChatRoomFactory
-{
-  Mono<ChatRoomInfo> createChatRoom(UUID id, String name);
-}
index 6d88be9..33c522d 100644 (file)
@@ -18,5 +18,5 @@ public class ChatRoomInfo
   @Getter
   private final String name;
   @Getter
-  private final int shard;
+  private final Integer shard;
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java
deleted file mode 100644 (file)
index 855c401..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.*;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.Clock;
-import java.util.UUID;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class InMemoryChatRoomFactory implements ChatRoomFactory
-{
-  private final ShardingStrategy shardingStrategy;
-  private final Clock clock;
-  private final int bufferSize;
-
-
-  @Override
-  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
-  {
-    log.info("Creating ChatRoom with buffer-size {}", bufferSize);
-    int shard = shardingStrategy.selectShard(id);
-    ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
-    ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
-    chatHomeService.putChatRoom(chatRoom);
-    return Mono.just(chatRoom);
-  }
-}
index b8bc4b8..c6aff1e 100644 (file)
@@ -1,9 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence.inmemory;
 
-import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.domain.*;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -41,6 +38,15 @@ public class ShardedChatHome implements ChatHome
   }
 
 
+  @Override
+  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+  {
+    int shard = shardingStrategy.selectShard(id);
+    return chatHomes[shard] == null
+        ? Mono.error(new ShardNotOwnedException(shard))
+        : chatHomes[shard].createChatRoom(id, name);
+  }
+
   @Override
   public Mono<ChatRoom> getChatRoom(UUID id)
   {
index 2987dd4..c772d7e 100644 (file)
@@ -1,34 +1,42 @@
 package de.juplo.kafka.chat.backend.persistence.inmemory;
 
-import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.domain.*;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.time.Clock;
 import java.util.*;
 
 
 @Slf4j
 public class SimpleChatHome implements ChatHome
 {
-  private final Map<UUID, ChatRoom> chatrooms;
+  private final Integer shard;
+  private final Map<UUID, ChatRoom> chatRooms;
+  private final Clock clock;
+  private final int bufferSize;
 
 
 
-  public SimpleChatHome(Flux<ChatRoom> chatroomFlux)
+  public SimpleChatHome(
+      Flux<ChatRoom> chatroomFlux,
+      Clock clock,
+      int bufferSize)
   {
-    this(chatroomFlux, null);
+    this(null, chatroomFlux, clock, bufferSize);
   }
 
   public SimpleChatHome(
       Integer shard,
-      Flux<ChatRoom> chatroomFlux)
+      Flux<ChatRoom> chatroomFlux,
+      Clock clock,
+      int bufferSize)
   {
     log.info("Created SimpleChatHome for shard {}", shard);
-
-    this.chatrooms = new HashMap<>();
+;
+    this.shard = shard;
+    this.chatRooms = new HashMap<>();
     chatroomFlux
         .filter(chatRoom ->
         {
@@ -46,21 +54,33 @@ public class SimpleChatHome implements ChatHome
           }
         })
         .toStream()
-        .forEach(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
+        .forEach(chatroom -> chatRooms.put(chatroom.getId(), chatroom));
+    this.clock = clock;
+    this.bufferSize = bufferSize;
   }
 
 
+  @Override
+  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+  {
+    log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+    ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
+    ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
+    chatRooms.put(id, chatRoom);
+    return Mono.just(chatRoom);
+  }
+
   @Override
   public Mono<ChatRoom> getChatRoom(UUID id)
   {
     return Mono
-        .justOrEmpty(chatrooms.get(id))
+        .justOrEmpty(chatRooms.get(id))
         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
   }
 
   @Override
   public Flux<ChatRoom> getChatRooms()
   {
-    return Flux.fromIterable(chatrooms.values());
+    return Flux.fromIterable(chatRooms.values());
   }
 }
index 07fb885..0622839 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHome;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -20,6 +21,14 @@ public class KafkaChatHome implements ChatHome
   private final ChatRoomChannel chatRoomChannel;
 
 
+
+  @Override
+  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+  {
+    log.info("Sending create-command for chat rooom: id={}, name={}");
+    return chatRoomChannel.sendCreateChatRoomRequest(id, name);
+  }
+
   @Override
   public Mono<ChatRoom> getChatRoom(UUID id)
   {
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java
deleted file mode 100644 (file)
index 6a1dc78..0000000
+++ /dev/null
@@ -1,24 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Mono;
-
-import java.util.UUID;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class KafkaChatRoomFactory implements ChatRoomFactory
-{
-  private final ChatRoomChannel chatRoomChannel;
-
-  @Override
-  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
-  {
-    log.info("Sending create-command for chat rooom: id={}, name={}");
-    return chatRoomChannel.sendCreateChatRoomRequest(id, name);
-  }
-}
index 1cd41b5..df4faed 100644 (file)
@@ -43,12 +43,6 @@ public class KafkaServicesConfiguration
         chatRoomChannel);
   }
 
-  @Bean
-  KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
-  {
-    return new KafkaChatRoomFactory(chatRoomChannel);
-  }
-
   @Bean
   ChatRoomChannel chatRoomChannel(
       ChatBackendProperties properties,