]> juplo.de Git - demos/kafka/chat/commitdiff
WIP
authorKai Moritz <kai@juplo.de>
Sun, 3 Sep 2023 09:15:19 +0000 (11:15 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 3 Sep 2023 09:15:19 +0000 (11:15 +0200)
src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java

index 339451a8da61bf970a0adcdd8c90e5b24cc4848a..f41f45f63c522a6d27d1e5143da3741aee6a00c0 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.api;
 
 import de.juplo.kafka.chat.backend.domain.ChatHome;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import lombok.RequiredArgsConstructor;
 import org.springframework.http.codec.ServerSentEvent;
@@ -18,7 +17,6 @@ import java.util.UUID;
 public class ChatBackendController
 {
   private final ChatHome chatHome;
-  private final ChatRoomFactory factory;
   private final StorageStrategy storageStrategy;
 
 
@@ -26,7 +24,7 @@ public class ChatBackendController
   public Mono<ChatRoomInfoTo> create(@RequestBody String name)
   {
     UUID chatRoomId = UUID.randomUUID();
-    return factory
+    return chatHome
         .createChatRoom(chatRoomId, name)
         .map(ChatRoomInfoTo::from);
   }
index 6091c0c5afd1a8288a7951be0771789f85ef5cff..e4d92dbbc2149afdf4d9c967171b29ec323b8e95 100644 (file)
@@ -8,6 +8,8 @@ import java.util.UUID;
 
 public interface ChatHome
 {
+  Mono<ChatRoomInfo> createChatRoom(UUID id, String name);
+
   Mono<ChatRoom> getChatRoom(UUID id);
 
   Flux<ChatRoom> getChatRooms();
index b9463095b038e47e3419e4b123b6c51e29db3ea4..c66b887d9730fbbf09ff18d4e791eae13f1b2565 100644 (file)
@@ -26,7 +26,7 @@ public class ChatRoom extends ChatRoomInfo
   public ChatRoom(
       UUID id,
       String name,
-      int shard,
+      Integer shard,
       Clock clock,
       ChatRoomService service,
       int bufferSize)
diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java
deleted file mode 100644 (file)
index 603795d..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka.chat.backend.domain;
-
-import reactor.core.publisher.Mono;
-
-import java.util.UUID;
-
-
-public interface ChatRoomFactory
-{
-  Mono<ChatRoomInfo> createChatRoom(UUID id, String name);
-}
index 6d88be95c490018ce5460389131b2217ce5f1f94..33c522d17e572e9060b36fb07409460603a12a15 100644 (file)
@@ -18,5 +18,5 @@ public class ChatRoomInfo
   @Getter
   private final String name;
   @Getter
-  private final int shard;
+  private final Integer shard;
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java
deleted file mode 100644 (file)
index 855c401..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.*;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.Clock;
-import java.util.UUID;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class InMemoryChatRoomFactory implements ChatRoomFactory
-{
-  private final ShardingStrategy shardingStrategy;
-  private final Clock clock;
-  private final int bufferSize;
-
-
-  @Override
-  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
-  {
-    log.info("Creating ChatRoom with buffer-size {}", bufferSize);
-    int shard = shardingStrategy.selectShard(id);
-    ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
-    ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
-    chatHomeService.putChatRoom(chatRoom);
-    return Mono.just(chatRoom);
-  }
-}
index b8bc4b8ec9d598e790dd594edf58498ad470b567..c6aff1e3dbec0c64afeff7c05fade5a50248627b 100644 (file)
@@ -1,9 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence.inmemory;
 
-import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.domain.*;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -41,6 +38,15 @@ public class ShardedChatHome implements ChatHome
   }
 
 
+  @Override
+  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+  {
+    int shard = shardingStrategy.selectShard(id);
+    return chatHomes[shard] == null
+        ? Mono.error(new ShardNotOwnedException(shard))
+        : chatHomes[shard].createChatRoom(id, name);
+  }
+
   @Override
   public Mono<ChatRoom> getChatRoom(UUID id)
   {
index 2987dd43852f561c83a955e4444209ef9bf595b4..c772d7eb4e5a08cecdc151022107cd48bf4cd0cd 100644 (file)
@@ -1,34 +1,42 @@
 package de.juplo.kafka.chat.backend.persistence.inmemory;
 
-import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.domain.*;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.time.Clock;
 import java.util.*;
 
 
 @Slf4j
 public class SimpleChatHome implements ChatHome
 {
-  private final Map<UUID, ChatRoom> chatrooms;
+  private final Integer shard;
+  private final Map<UUID, ChatRoom> chatRooms;
+  private final Clock clock;
+  private final int bufferSize;
 
 
 
-  public SimpleChatHome(Flux<ChatRoom> chatroomFlux)
+  public SimpleChatHome(
+      Flux<ChatRoom> chatroomFlux,
+      Clock clock,
+      int bufferSize)
   {
-    this(chatroomFlux, null);
+    this(null, chatroomFlux, clock, bufferSize);
   }
 
   public SimpleChatHome(
       Integer shard,
-      Flux<ChatRoom> chatroomFlux)
+      Flux<ChatRoom> chatroomFlux,
+      Clock clock,
+      int bufferSize)
   {
     log.info("Created SimpleChatHome for shard {}", shard);
-
-    this.chatrooms = new HashMap<>();
+;
+    this.shard = shard;
+    this.chatRooms = new HashMap<>();
     chatroomFlux
         .filter(chatRoom ->
         {
@@ -46,21 +54,33 @@ public class SimpleChatHome implements ChatHome
           }
         })
         .toStream()
-        .forEach(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
+        .forEach(chatroom -> chatRooms.put(chatroom.getId(), chatroom));
+    this.clock = clock;
+    this.bufferSize = bufferSize;
   }
 
 
+  @Override
+  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+  {
+    log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+    ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
+    ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
+    chatRooms.put(id, chatRoom);
+    return Mono.just(chatRoom);
+  }
+
   @Override
   public Mono<ChatRoom> getChatRoom(UUID id)
   {
     return Mono
-        .justOrEmpty(chatrooms.get(id))
+        .justOrEmpty(chatRooms.get(id))
         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
   }
 
   @Override
   public Flux<ChatRoom> getChatRooms()
   {
-    return Flux.fromIterable(chatrooms.values());
+    return Flux.fromIterable(chatRooms.values());
   }
 }
index 07fb8858744f7853382db7a4c2cccfe42e1def54..06228396cede9f92a0a0dbbd4fbcd3299785ad3d 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHome;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -20,6 +21,14 @@ public class KafkaChatHome implements ChatHome
   private final ChatRoomChannel chatRoomChannel;
 
 
+
+  @Override
+  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+  {
+    log.info("Sending create-command for chat rooom: id={}, name={}");
+    return chatRoomChannel.sendCreateChatRoomRequest(id, name);
+  }
+
   @Override
   public Mono<ChatRoom> getChatRoom(UUID id)
   {
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java
deleted file mode 100644 (file)
index 6a1dc78..0000000
+++ /dev/null
@@ -1,24 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Mono;
-
-import java.util.UUID;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class KafkaChatRoomFactory implements ChatRoomFactory
-{
-  private final ChatRoomChannel chatRoomChannel;
-
-  @Override
-  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
-  {
-    log.info("Sending create-command for chat rooom: id={}, name={}");
-    return chatRoomChannel.sendCreateChatRoomRequest(id, name);
-  }
-}
index 1cd41b533ce3a93ca6dff4bae2a74fa04d4a5e44..df4faed8f4bd080fbfd0a91a9df672280b389c53 100644 (file)
@@ -43,12 +43,6 @@ public class KafkaServicesConfiguration
         chatRoomChannel);
   }
 
-  @Bean
-  KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
-  {
-    return new KafkaChatRoomFactory(chatRoomChannel);
-  }
-
   @Bean
   ChatRoomChannel chatRoomChannel(
       ChatBackendProperties properties,