refactor: Splitted `ChatRoomInfo` and `ChatRoomData` - Aligned Code
authorKai Moritz <kai@juplo.de>
Sun, 3 Sep 2023 17:54:46 +0000 (19:54 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 27 Jan 2024 15:38:57 +0000 (16:38 +0100)
33 files changed:
src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java
src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java
src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/ChatRoomServiceFactory.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java
src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomDataTest.java
src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java
src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java
src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java [deleted file]
src/test/resources/data/mongodb/0001.sh
src/test/resources/data/mongodb/chatRoomTo.json
src/test/resources/data/mongodb/messageTo.json [new file with mode: 0644]

index 8e1ff9e..15f8ae1 100644 (file)
@@ -16,7 +16,7 @@ public class ChatBackendApplication implements WebFluxConfigurer
        @Autowired
        ChatBackendProperties properties;
        @Autowired
-       ChatHome[] chatHomes;
+       ChatHome chatHome;
        @Autowired
        StorageStrategy storageStrategy;
 
@@ -32,8 +32,7 @@ public class ChatBackendApplication implements WebFluxConfigurer
        @PreDestroy
        public void onExit()
        {
-               for (int shard = 0; shard < chatHomes.length; shard++)
-                       storageStrategy.write(chatHomes[shard].getChatRooms());
+               storageStrategy.write(chatHome);
        }
 
        public static void main(String[] args)
index f41f45f..e2fe714 100644 (file)
@@ -1,7 +1,7 @@
 package de.juplo.kafka.chat.backend.api;
 
 import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import lombok.RequiredArgsConstructor;
 import org.springframework.http.codec.ServerSentEvent;
@@ -33,49 +33,49 @@ public class ChatBackendController
   public Flux<ChatRoomInfoTo> list()
   {
     return chatHome
-        .getChatRooms()
-        .map(chatroom -> ChatRoomInfoTo.from(chatroom));
+        .getChatRoomInfo()
+        .map(chatroomInfo -> ChatRoomInfoTo.from(chatroomInfo));
   }
 
-  @GetMapping("{chatroomId}/list")
-  public Flux<MessageTo> list(@PathVariable UUID chatroomId)
+  @GetMapping("{chatRoomId}/list")
+  public Flux<MessageTo> list(@PathVariable UUID chatRoomId)
   {
     return chatHome
-        .getChatRoom(chatroomId)
-        .flatMapMany(chatroom -> chatroom
+        .getChatRoomData(chatRoomId)
+        .flatMapMany(chatRoomData -> chatRoomData
             .getMessages()
             .map(MessageTo::from));
   }
 
-  @GetMapping("{chatroomId}")
-  public Mono<ChatRoomInfoTo> get(@PathVariable UUID chatroomId)
+  @GetMapping("{chatRoomId}")
+  public Mono<ChatRoomInfoTo> get(@PathVariable UUID chatRoomId)
   {
     return chatHome
-        .getChatRoom(chatroomId)
-        .map(chatroom -> ChatRoomInfoTo.from(chatroom));
+        .getChatRoomInfo(chatRoomId)
+        .map(chatRoomInfo -> ChatRoomInfoTo.from(chatRoomInfo));
   }
 
-  @PutMapping("{chatroomId}/{username}/{messageId}")
+  @PutMapping("{chatRoomId}/{username}/{messageId}")
   public Mono<MessageTo> put(
-      @PathVariable UUID chatroomId,
+      @PathVariable UUID chatRoomId,
       @PathVariable String username,
       @PathVariable Long messageId,
       @RequestBody String text)
   {
     return
         chatHome
-            .getChatRoom(chatroomId)
-            .flatMap(chatroom -> put(chatroom, username, messageId, text));
+            .getChatRoomData(chatRoomId)
+            .flatMap(chatRoomData -> put(chatRoomData, username, messageId, text));
   }
 
   private Mono<MessageTo> put(
-      ChatRoom chatroom,
+      ChatRoomData chatRoomData,
       String username,
       Long messageId,
       String text)
   {
     return
-        chatroom
+        chatRoomData
             .addMessage(
                 messageId,
                 username,
@@ -83,40 +83,40 @@ public class ChatBackendController
             .map(message -> MessageTo.from(message));
   }
 
-  @GetMapping("{chatroomId}/{username}/{messageId}")
+  @GetMapping("{chatRoomId}/{username}/{messageId}")
   public Mono<MessageTo> get(
-      @PathVariable UUID chatroomId,
+      @PathVariable UUID chatRoomId,
       @PathVariable String username,
       @PathVariable Long messageId)
   {
     return
         chatHome
-            .getChatRoom(chatroomId)
-            .flatMap(chatroom -> get(chatroom, username, messageId));
+            .getChatRoomData(chatRoomId)
+            .flatMap(chatRoomData -> get(chatRoomData, username, messageId));
   }
 
   private Mono<MessageTo> get(
-      ChatRoom chatroom,
+      ChatRoomData chatRoomData,
       String username,
       Long messageId)
   {
     return
-        chatroom
+        chatRoomData
             .getMessage(username, messageId)
             .map(message -> MessageTo.from(message));
   }
 
-  @GetMapping(path = "{chatroomId}/listen")
-  public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatroomId)
+  @GetMapping(path = "{chatRoomId}/listen")
+  public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatRoomId)
   {
     return chatHome
-        .getChatRoom(chatroomId)
-        .flatMapMany(chatroom -> listen(chatroom));
+        .getChatRoomData(chatRoomId)
+        .flatMapMany(chatRoomData -> listen(chatRoomData));
   }
 
-  private Flux<ServerSentEvent<MessageTo>> listen(ChatRoom chatroom)
+  private Flux<ServerSentEvent<MessageTo>> listen(ChatRoomData chatRoomData)
   {
-    return chatroom
+    return chatRoomData
         .listen()
         .log()
         .map(message -> MessageTo.from(message))
@@ -131,6 +131,6 @@ public class ChatBackendController
   @PostMapping("/store")
   public void store()
   {
-    storageStrategy.write(chatHome.getChatRooms());
+    storageStrategy.write(chatHome);
   }
 }
index e4d92db..2ff59cb 100644 (file)
@@ -10,7 +10,9 @@ public interface ChatHome
 {
   Mono<ChatRoomInfo> createChatRoom(UUID id, String name);
 
-  Mono<ChatRoom> getChatRoom(UUID id);
+  Mono<ChatRoomInfo> getChatRoomInfo(UUID id);
 
-  Flux<ChatRoom> getChatRooms();
+  Flux<ChatRoomInfo> getChatRoomInfo();
+
+  Mono<ChatRoomData> getChatRoomData(UUID id);
 }
index 873e58e..511b9ad 100644 (file)
@@ -13,17 +13,17 @@ import java.util.regex.Pattern;
 
 
 @Slf4j
-public class ChatRoom
+public class ChatRoomData
 {
   public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
 
-  private final Clock clock;
   private final ChatRoomService service;
+  private final Clock clock;
   private final int bufferSize;
   private Sinks.Many<Message> sink;
 
 
-  public ChatRoom(
+  public ChatRoomData(
       Clock clock,
       ChatRoomService service,
       int bufferSize)
index bedd0aa..7d0f66d 100644 (file)
@@ -1,11 +1,30 @@
 package de.juplo.kafka.chat.backend.persistence;
 
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
 import reactor.core.publisher.Flux;
 
+import java.util.UUID;
+
 
 public interface StorageStrategy
 {
-  void write(Flux<ChatRoom> chatroomFlux);
-  Flux<ChatRoom> read();
+  default void write(ChatHome chatHome)
+  {
+    writeChatRoomInfo(
+        chatHome
+            .getChatRoomInfo()
+            .doOnNext(chatRoomInfo ->
+                writeChatRoomData(
+                    chatRoomInfo.getId(),
+                    chatHome
+                        .getChatRoomData(chatRoomInfo.getId())
+                        .flatMapMany(chatRoomData -> chatRoomData.getMessages()))));
+  }
+
+  void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
+  Flux<ChatRoomInfo> readChatRoomInfo();
+  void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
+  Flux<Message> readChatRoomData(UUID chatRoomId);
 }
index 106c736..4c90d6a 100644 (file)
@@ -31,7 +31,7 @@ public class InMemoryServicesConfiguration
       Clock clock)
   {
     return new SimpleChatHome(
-        storageStrategy.read(),
+        storageStrategy,
         clock,
         properties.getChatroomBufferSize());
   }
@@ -52,7 +52,7 @@ public class InMemoryServicesConfiguration
         .of(properties.getInmemory().getOwnedShards())
         .forEach(shard -> chatHomes[shard] = new SimpleChatHome(
             shard,
-            storageStrategy.read(),
+            storageStrategy,
             clock,
             properties.getChatroomBufferSize()));
     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
index c6aff1e..eb3a6ce 100644 (file)
@@ -14,13 +14,13 @@ import java.util.stream.Collectors;
 @Slf4j
 public class ShardedChatHome implements ChatHome
 {
-  private final ChatHome[] chatHomes;
+  private final SimpleChatHome[] chatHomes;
   private final Set<Integer> ownedShards;
   private final ShardingStrategy shardingStrategy;
 
 
   public  ShardedChatHome(
-      ChatHome[] chatHomes,
+      SimpleChatHome[] chatHomes,
       ShardingStrategy shardingStrategy)
   {
     this.chatHomes = chatHomes;
@@ -48,13 +48,13 @@ public class ShardedChatHome implements ChatHome
   }
 
   @Override
-  public Mono<ChatRoom> getChatRoom(UUID id)
+  public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
   {
     int shard = selectShard(id);
     return chatHomes[shard] == null
         ? Mono.error(new ShardNotOwnedException(shard))
         : chatHomes[shard]
-            .getChatRoom(id)
+            .getChatRoomInfo(id)
             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
             ? new UnknownChatroomException(
                 id,
@@ -64,13 +64,37 @@ public class ShardedChatHome implements ChatHome
   }
 
   @Override
-  public Flux<ChatRoom> getChatRooms()
+  public Flux<ChatRoomInfo> getChatRoomInfo()
   {
     return Flux
         .fromIterable(ownedShards)
-        .flatMap(shard -> chatHomes[shard].getChatRooms());
+        .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
   }
 
+  @Override
+  public Mono<ChatRoomData> getChatRoomData(UUID id)
+  {
+    int shard = selectShard(id);
+    return chatHomes[shard] == null
+        ? Mono.error(new ShardNotOwnedException(shard))
+        : chatHomes[shard]
+            .getChatRoomData(id)
+            .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
+                ? new UnknownChatroomException(
+                id,
+                shard,
+                ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
+                : throwable);
+  }
+
+  public Flux<ChatRoomData> getChatRoomData()
+  {
+    return Flux
+        .fromIterable(ownedShards)
+        .flatMap(shard -> chatHomes[shard].getChatRoomData());
+  }
+
+
 
   private int selectShard(UUID chatroomId)
   {
index c2d25b2..c8ddbf9 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.chat.backend.persistence.inmemory;
 
 import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -13,34 +14,41 @@ import java.util.*;
 public class SimpleChatHome implements ChatHome
 {
   private final Integer shard;
-  private final Map<UUID, ChatRoom> chatRooms;
+  private final Map<UUID, ChatRoomInfo> chatRoomInfo;
+  private final Map<UUID, ChatRoomData> chatRoomData;
   private final Clock clock;
   private final int bufferSize;
 
 
 
   public SimpleChatHome(
-      Flux<ChatRoom> chatroomFlux,
+      StorageStrategy storageStrategy,
       Clock clock,
       int bufferSize)
   {
-    this(null, chatroomFlux, clock, bufferSize);
+    this(
+        null,
+        storageStrategy,
+        clock,
+        bufferSize);
   }
 
   public SimpleChatHome(
       Integer shard,
-      Flux<ChatRoom> chatroomFlux,
+      StorageStrategy storageStrategy,
       Clock clock,
       int bufferSize)
   {
     log.info("Created SimpleChatHome for shard {}", shard);
 ;
     this.shard = shard;
-    this.chatRooms = new HashMap<>();
-    chatroomFlux
-        .filter(chatRoom ->
+    this.chatRoomInfo = new HashMap<>();
+    this.chatRoomData = new HashMap<>();
+    storageStrategy
+        .readChatRoomInfo()
+        .filter(info ->
         {
-          if (shard == null || chatRoom.getShard() == shard)
+          if (shard == null || info.getShard() == shard)
           {
             return true;
           }
@@ -49,12 +57,24 @@ public class SimpleChatHome implements ChatHome
             log.info(
                 "SimpleChatHome for shard {} ignores not owned chat-room {}",
                 shard,
-                chatRoom);
+                info);
             return false;
           }
         })
         .toStream()
-        .forEach(chatroom -> chatRooms.put(chatroom.getId(), chatroom));
+        .forEach(info ->
+        {
+          UUID chatRoomId = info.getId();
+          chatRoomInfo.put(chatRoomId, info);
+          Flux<Message> messageFlux =
+              storageStrategy.readChatRoomData(chatRoomId);
+          chatRoomData.put(
+              info.getId(),
+              new ChatRoomData(
+                  clock,
+                  new InMemoryChatRoomService(messageFlux),
+                  bufferSize));
+        });
     this.clock = clock;
     this.bufferSize = bufferSize;
   }
@@ -65,22 +85,37 @@ public class SimpleChatHome implements ChatHome
   {
     log.info("Creating ChatRoom with buffer-size {}", bufferSize);
     ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
-    ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
-    chatRooms.put(id, chatRoom);
-    return Mono.just(chatRoom);
+    ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
+    this.chatRoomInfo.put(id, chatRoomInfo);
+    ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+    this.chatRoomData.put(id, chatRoomData);
+    return Mono.just(chatRoomInfo);
   }
 
   @Override
-  public Mono<ChatRoom> getChatRoom(UUID id)
+  public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
   {
     return Mono
-        .justOrEmpty(chatRooms.get(id))
+        .justOrEmpty(chatRoomInfo.get(id))
         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
   }
 
   @Override
-  public Flux<ChatRoom> getChatRooms()
+  public Flux<ChatRoomInfo> getChatRoomInfo()
+  {
+    return Flux.fromIterable(chatRoomInfo.values());
+  }
+
+  @Override
+  public Mono<ChatRoomData> getChatRoomData(UUID id)
+  {
+    return Mono
+        .justOrEmpty(chatRoomData.get(id))
+        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
+  }
+
+  public Flux<ChatRoomData> getChatRoomData()
   {
-    return Flux.fromIterable(chatRooms.values());
+    return Flux.fromIterable(chatRoomData.values());
   }
 }
index 234554e..45e93cc 100644 (file)
@@ -35,7 +35,8 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   private final boolean[] isShardOwned;
   private final long[] currentOffset;
   private final long[] nextOffset;
-  private final Map<UUID, ChatRoom>[] chatrooms;
+  private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
+  private final Map<UUID, ChatRoomData>[] chatRoomData;
 
   private boolean running;
   @Getter
@@ -65,10 +66,15 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     this.isShardOwned = new boolean[numShards];
     this.currentOffset = new long[numShards];
     this.nextOffset = new long[numShards];
-    this.chatrooms = new Map[numShards];
+    this.chatRoomInfo = new Map[numShards];
+    this.chatRoomData = new Map[numShards];
     IntStream
         .range(0, numShards)
-        .forEach(shard -> this.chatrooms[shard] = new HashMap<>());
+        .forEach(shard ->
+        {
+          this.chatRoomInfo[shard] = new HashMap<>();
+          this.chatRoomData[shard] = new HashMap<>();
+        });
   }
 
 
@@ -277,30 +283,37 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   private void createChatRoom(
       UUID chatRoomId,
       CommandCreateChatRoomTo createChatRoomRequestTo,
-      int partition)
+      Integer partition)
   {
-    log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
-    KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
-    ChatRoom chatRoom = new ChatRoom(
+    log.info(
+        "Loading ChatRoom {} for shard {} with buffer-size {}",
         chatRoomId,
-        createChatRoomRequestTo.getName(),
         partition,
+        bufferSize);
+    KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
+    ChatRoomData chatRoomData = new ChatRoomData(
         clock,
         service,
         bufferSize);
-    putChatRoom(chatRoom);
+    putChatRoom(
+        chatRoomId,
+        createChatRoomRequestTo.getName(),
+        partition,
+        chatRoomData);
   }
 
 
   private void createChatRoom(ChatRoomInfo chatRoomInfo)
   {
     UUID id = chatRoomInfo.getId();
-    String name = chatRoomInfo.getName();
-    int shard = chatRoomInfo.getShard();
     log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
     KafkaChatRoomService service = new KafkaChatRoomService(this, id);
-    ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
-    putChatRoom(chatRoom);
+    ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+    putChatRoom(
+        chatRoomInfo.getId(),
+        chatRoomInfo.getName(),
+        chatRoomInfo.getShard(),
+        chatRoomData);
   }
 
   private void loadChatMessage(
@@ -313,9 +326,9 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
     Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
 
-    ChatRoom chatRoom = chatrooms[partition].get(chatRoomId);
+    ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
     KafkaChatRoomService kafkaChatRoomService =
-        (KafkaChatRoomService) chatRoom.getChatRoomService();
+        (KafkaChatRoomService) chatRoomData.getChatRoomService();
 
     kafkaChatRoomService.persistMessage(message);
   }
@@ -338,22 +351,30 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   }
 
 
-  private void putChatRoom(ChatRoom chatRoom)
+  private void putChatRoom(
+      UUID chatRoomId,
+      String name,
+      Integer partition,
+      ChatRoomData chatRoomData)
   {
-    Integer partition = chatRoom.getShard();
-    UUID chatRoomId = chatRoom.getId();
-    if (chatrooms[partition].containsKey(chatRoomId))
+    if (this.chatRoomInfo[partition].containsKey(chatRoomId))
     {
-      log.warn("Ignoring existing chat-room: " + chatRoom);
+      log.warn(
+          "Ignoring existing chat-room for {}: {}",
+          partition,
+          chatRoomId);
     }
     else
     {
       log.info(
           "Adding new chat-room to partition {}: {}",
           partition,
-          chatRoom);
+          chatRoomData);
 
-      chatrooms[partition].put(chatRoomId, chatRoom);
+      this.chatRoomInfo[partition].put(
+          chatRoomId,
+          new ChatRoomInfo(chatRoomId, name, partition));
+      this.chatRoomData[partition].put(chatRoomId, chatRoomData);
     }
   }
 
@@ -365,7 +386,30 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
         .toArray();
   }
 
-  Mono<ChatRoom> getChatRoom(int shard, UUID id)
+  Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
+  {
+    if (loadInProgress)
+    {
+      return Mono.error(new LoadInProgressException());
+    }
+
+    if (!isShardOwned[shard])
+    {
+      return Mono.error(new ShardNotOwnedException(shard));
+    }
+
+    return Mono.justOrEmpty(chatRoomData[shard].get(id));
+  }
+
+  Flux<ChatRoomInfo> getChatRoomInfo()
+  {
+    return Flux
+        .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
+        .filter(shard -> isShardOwned[shard])
+        .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
+  }
+
+  Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
   {
     if (loadInProgress)
     {
@@ -377,14 +421,14 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
       return Mono.error(new ShardNotOwnedException(shard));
     }
 
-    return Mono.justOrEmpty(chatrooms[shard].get(id));
+    return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
   }
 
-  Flux<ChatRoom> getChatRooms()
+  Flux<ChatRoomData> getChatRoomData()
   {
     return Flux
         .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
         .filter(shard -> isShardOwned[shard])
-        .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values()));
+        .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values()));
   }
 }
index 0622839..8c749d6 100644 (file)
@@ -1,7 +1,7 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
 import lombok.RequiredArgsConstructor;
@@ -30,26 +30,43 @@ public class KafkaChatHome implements ChatHome
   }
 
   @Override
-  public Mono<ChatRoom> getChatRoom(UUID id)
+  public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
   {
     int shard = selectShard(id);
     return chatRoomChannel
-        .getChatRoom(shard, id)
+        .getChatRoomInfo(shard, id)
         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
             id,
             shard,
             chatRoomChannel.getOwnedShards())));
   }
 
-  int selectShard(UUID chatRoomId)
+  @Override
+  public Flux<ChatRoomInfo> getChatRoomInfo()
   {
-    byte[] serializedKey = chatRoomId.toString().getBytes();
-    return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+    return chatRoomChannel.getChatRoomInfo();
   }
 
   @Override
-  public Flux<ChatRoom> getChatRooms()
+  public Mono<ChatRoomData> getChatRoomData(UUID id)
   {
-      return chatRoomChannel.getChatRooms();
+    int shard = selectShard(id);
+    return chatRoomChannel
+        .getChatRoomData(shard, id)
+        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
+            id,
+            shard,
+            chatRoomChannel.getOwnedShards())));
+  }
+
+  public Flux<ChatRoomData> getChatRoomData()
+  {
+      return chatRoomChannel.getChatRoomData();
+  }
+
+  int selectShard(UUID chatRoomId)
+  {
+    byte[] serializedKey = chatRoomId.toString().getBytes();
+    return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
   }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/ChatRoomServiceFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/ChatRoomServiceFactory.java
deleted file mode 100644 (file)
index d06c8f9..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.storage.files;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoomService;
-import de.juplo.kafka.chat.backend.domain.Message;
-import reactor.core.publisher.Flux;
-
-
-public interface ChatRoomServiceFactory
-{
-  ChatRoomService create(Flux<Message> messageFlux);
-}
index 11b6440..5eb3bbd 100644 (file)
@@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration;
@@ -13,7 +12,6 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 import java.nio.file.Paths;
-import java.time.Clock;
 
 
 @ConditionalOnProperty(
@@ -30,16 +28,12 @@ public class FilesStorageConfiguration
   @Bean
   public StorageStrategy storageStrategy(
       ChatBackendProperties properties,
-      Clock clock,
       ShardingStrategy shardingStrategy,
       ObjectMapper mapper)
   {
     return new FilesStorageStrategy(
         Paths.get(properties.getInmemory().getStorageDirectory()),
-        clock,
-        properties.getChatroomBufferSize(),
         shardingStrategy,
-        messageFlux -> new InMemoryChatRoomService(messageFlux),
         mapper);
   }
 }
index 025e3ae..1f0ebad 100644 (file)
@@ -5,10 +5,10 @@ import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
 import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
@@ -16,7 +16,6 @@ import reactor.core.publisher.Flux;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.time.Clock;
 import java.util.UUID;
 
 import static java.nio.file.StandardOpenOption.CREATE;
@@ -31,15 +30,12 @@ public class FilesStorageStrategy implements StorageStrategy
 
 
   private final Path storagePath;
-  private final Clock clock;
-  private final int bufferSize;
   private final ShardingStrategy shardingStrategy;
-  private final ChatRoomServiceFactory factory;
   private final ObjectMapper mapper;
 
 
   @Override
-  public void write(Flux<ChatRoom> chatroomFlux)
+  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
   {
     Path path = chatroomsPath();
     log.info("Writing chatrooms to {}", path);
@@ -52,7 +48,7 @@ public class FilesStorageStrategy implements StorageStrategy
               .getFactory()
               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
 
-      chatroomFlux
+      chatRoomInfoFlux
           .log()
           .doFirst(() ->
           {
@@ -78,13 +74,12 @@ public class FilesStorageStrategy implements StorageStrategy
               throw new RuntimeException(e);
             }
           })
-          .subscribe(chatroom ->
+          .subscribe(chatRoomInfo ->
           {
             try
             {
-              ChatRoomInfoTo infoTo = ChatRoomInfoTo.from(chatroom);
-              generator.writeObject(infoTo);
-              writeMessages(infoTo, chatroom.getMessages());
+              ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
+              generator.writeObject(chatRoomInfoTo);
             }
             catch (IOException e)
             {
@@ -99,30 +94,37 @@ public class FilesStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public Flux<ChatRoom> read()
+  public Flux<ChatRoomInfo> readChatRoomInfo()
   {
     JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
     return Flux
         .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
         .log()
-        .map(infoTo ->
+        .map(chatRoomInfoTo ->
         {
-          UUID chatRoomId = infoTo.getId();
+          UUID chatRoomId = chatRoomInfoTo.getId();
           int shard = shardingStrategy.selectShard(chatRoomId);
-          return new ChatRoom(
-              infoTo.getId(),
-              infoTo.getName(),
-              shard,
-              clock,
-              factory.create(readMessages(infoTo)),
-              bufferSize);
+
+          log.info(
+              "{} - old shard: {}, new shard:  {}",
+              chatRoomId,
+              chatRoomInfoTo.getShard(),
+              shard);
+
+          return new ChatRoomInfo(
+              chatRoomId,
+              chatRoomInfoTo.getName(),
+              shard);
         });
   }
 
-  public void writeMessages(ChatRoomInfoTo infoTo, Flux<Message> messageFlux)
+  @Override
+  public void writeChatRoomData(
+      UUID chatRoomId,
+      Flux<Message> messageFlux)
   {
-    Path path = chatroomPath(infoTo);
-    log.info("Writing messages for {} to {}", infoTo, path);
+    Path path = chatroomPath(chatRoomId);
+    log.info("Writing messages for {} to {}", chatRoomId, path);
     try
     {
       Files.createDirectories(storagePath);
@@ -177,11 +179,12 @@ public class FilesStorageStrategy implements StorageStrategy
     }
   }
 
-  public Flux<Message> readMessages(ChatRoomInfoTo infoTo)
+  @Override
+  public Flux<Message> readChatRoomData(UUID chatRoomId)
   {
     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
     return Flux
-        .from(new JsonFilePublisher<MessageTo>(chatroomPath(infoTo), mapper, type))
+        .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatRoomId), mapper, type))
         .log()
         .map(MessageTo::toMessage);
   }
@@ -191,8 +194,8 @@ public class FilesStorageStrategy implements StorageStrategy
     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
   }
 
-  Path chatroomPath(ChatRoomInfoTo infoTo)
+  Path chatroomPath(UUID id)
   {
-    return storagePath.resolve(Path.of(infoTo.getId().toString() + ".json"));
+    return storagePath.resolve(Path.of(id.toString() + ".json"));
   }
 }
index 1ad8d17..0086053 100644 (file)
@@ -1,36 +1,30 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import lombok.*;
 import org.springframework.data.annotation.Id;
 import org.springframework.data.mongodb.core.mapping.Document;
 
-import java.util.List;
-
 
 @AllArgsConstructor
 @NoArgsConstructor
 @Getter(AccessLevel.PACKAGE)
 @Setter(AccessLevel.PACKAGE)
 @EqualsAndHashCode(of = { "id" })
-@ToString(of = { "id", "name" })
+@ToString(of = { "id", "shard", "name" })
 @Document
 public class ChatRoomTo
 {
   @Id
   private String id;
+  private Integer shard;
   private String name;
-  private List<MessageTo> messages;
 
-  public static ChatRoomTo from(ChatRoom chatroom)
+  public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
   {
     return new ChatRoomTo(
-        chatroom.getId().toString(),
-        chatroom.getName(),
-        chatroom
-            .getMessages()
-            .map(MessageTo::from)
-            .collectList()
-            .block());
+        chatRoomInfo.getId().toString(),
+        chatRoomInfo.getShard(),
+        chatRoomInfo.getName());
   }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java
new file mode 100644 (file)
index 0000000..a429f96
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+import java.util.List;
+
+
+public interface MessageRepository extends MongoRepository<MessageTo, String>
+{
+  List<MessageTo> findByChatRoomIdOrderBySerialAsc(String chatRoomId);
+}
index 4f93695..f6c6b85 100644 (file)
@@ -2,45 +2,51 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
 import de.juplo.kafka.chat.backend.domain.Message;
 import lombok.*;
+import org.springframework.data.mongodb.core.index.Indexed;
+import org.springframework.data.mongodb.core.mapping.Document;
+import org.springframework.data.mongodb.core.mapping.Field;
 
 import java.time.LocalDateTime;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.UUID;
 
 
 @AllArgsConstructor
 @NoArgsConstructor
 @Getter(AccessLevel.PACKAGE)
 @Setter(AccessLevel.PACKAGE)
-@EqualsAndHashCode(of = { "user", "id" })
-@ToString(of = { "user", "id" })
+@EqualsAndHashCode(of = { "chatRoomId", "user", "id" })
+@ToString(of = { "chatRoomId", "user", "id" })
+@Document
 class MessageTo
 {
-  final static Pattern SPLIT_ID = Pattern.compile("^([a-z-0-9]+)--([0-9]+)$");
-  private String id;
+  @Indexed
+  private String chatRoomId;
+  @Indexed
+  private String user;
+  @Field("id")
+  @Indexed
+  private Long id;
+  @Indexed
   private Long serial;
   private String time;
   private String text;
 
   Message toMessage()
   {
-    Matcher matcher = SPLIT_ID.matcher(id);
-    if (!matcher.matches())
-      throw new RuntimeException("MessageTo with invalid ID: " + id);
-    Long messageId = Long.parseLong(matcher.group(2));
-    String user = matcher.group(1);
     return new Message(
-        Message.MessageKey.of(user, messageId),
+        Message.MessageKey.of(user, id),
         serial,
         LocalDateTime.parse(time),
         text);
   }
 
-  static MessageTo from(Message message)
+  static MessageTo from(UUID chatRoomId, Message message)
   {
     return
         new MessageTo(
-             message.getUsername() + "--" + message.getId(),
+            chatRoomId.toString(),
+            message.getUsername(),
+            message.getId(),
             message.getSerialNumber(),
             message.getTimestamp().toString(),
             message.getMessageText());
index 9ef38a7..20e6ed4 100644 (file)
@@ -1,15 +1,11 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
-import java.time.Clock;
-
 
 @ConditionalOnProperty(
     prefix = "chat.backend.inmemory",
@@ -21,15 +17,12 @@ public class MongoDbStorageConfiguration
   @Bean
   public StorageStrategy storageStrategy(
       ChatRoomRepository chatRoomRepository,
-      ChatBackendProperties properties,
-      Clock clock,
+      MessageRepository messageRepository,
       ShardingStrategy shardingStrategy)
   {
     return new MongoDbStorageStrategy(
         chatRoomRepository,
-        clock,
-        properties.getChatroomBufferSize(),
-        shardingStrategy,
-        messageFlux -> new InMemoryChatRoomService(messageFlux));
+        messageRepository,
+        shardingStrategy);
   }
 }
index d3a18bc..bc821e6 100644 (file)
@@ -1,14 +1,13 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 
-import java.time.Clock;
 import java.util.UUID;
 
 
@@ -16,40 +15,55 @@ import java.util.UUID;
 @Slf4j
 public class MongoDbStorageStrategy implements StorageStrategy
 {
-  private final ChatRoomRepository repository;
-  private final Clock clock;
-  private final int bufferSize;
+  private final ChatRoomRepository chatRoomRepository;
+  private final MessageRepository messageRepository;
   private final ShardingStrategy shardingStrategy;
-  private final ChatRoomServiceFactory factory;
 
 
   @Override
-  public void write(Flux<ChatRoom> chatroomFlux)
+  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
   {
-    chatroomFlux
+    chatRoomInfoFlux
         .map(ChatRoomTo::from)
-        .subscribe(chatroomTo -> repository.save(chatroomTo));
+        .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo));
   }
 
   @Override
-  public Flux<ChatRoom> read()
+  public Flux<ChatRoomInfo> readChatRoomInfo()
   {
     return Flux
-        .fromIterable(repository.findAll())
+        .fromIterable(chatRoomRepository.findAll())
         .map(chatRoomTo ->
         {
           UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
           int shard = shardingStrategy.selectShard(chatRoomId);
-          return new ChatRoom(
+
+          log.info(
+              "{} - old shard: {}, new shard:  {}",
+              chatRoomId,
+              chatRoomTo.getShard(),
+              shard);
+
+          return new ChatRoomInfo(
               chatRoomId,
               chatRoomTo.getName(),
-              shard,
-              clock,
-              factory.create(
-                  Flux
-                      .fromIterable(chatRoomTo.getMessages())
-                      .map(messageTo -> messageTo.toMessage())),
-              bufferSize);
+              shard);
         });
   }
+
+  @Override
+  public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+  {
+    messageFlux
+        .map(message -> MessageTo.from(chatRoomId, message))
+        .subscribe(messageTo -> messageRepository.save(messageTo));
+  }
+
+  @Override
+  public Flux<Message> readChatRoomData(UUID chatRoomId)
+  {
+    return Flux
+        .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString()))
+        .map(messageTo -> messageTo.toMessage());
+  }
 }
index ffed299..ab24bb8 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.chat.backend.persistence.storage.nostorage;
 
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -10,6 +11,8 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import reactor.core.publisher.Flux;
 
+import java.util.UUID;
+
 
 @ConditionalOnProperty(
     prefix = "chat.backend.inmemory",
@@ -29,10 +32,19 @@ public class NoStorageStorageConfiguration
     return new StorageStrategy()
     {
       @Override
-      public void write(Flux<ChatRoom> chatroomFlux) {}
+      public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux) {}
+
+      @Override
+      public Flux<ChatRoomInfo> readChatRoomInfo()
+      {
+        return Flux.empty();
+      }
+
+      @Override
+      public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux) {}
 
       @Override
-      public Flux<ChatRoom> read()
+      public Flux<Message> readChatRoomData(UUID chatRoomId)
       {
         return Flux.empty();
       }
index 9b8b09f..a7e4078 100644 (file)
@@ -42,7 +42,7 @@ public class ChatBackendControllerTest
   {
     // Given
     UUID chatroomId = UUID.randomUUID();
-    when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+    when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -62,7 +62,7 @@ public class ChatBackendControllerTest
   {
     // Given
     UUID chatroomId = UUID.randomUUID();
-    when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+    when(chatHome.getChatRoomInfo(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -83,7 +83,7 @@ public class ChatBackendControllerTest
     UUID chatroomId = UUID.randomUUID();
     String username = "foo";
     Long messageId = 66l;
-    when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+    when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -109,7 +109,7 @@ public class ChatBackendControllerTest
     UUID chatroomId = UUID.randomUUID();
     String username = "foo";
     Long messageId = 66l;
-    when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+    when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -132,7 +132,7 @@ public class ChatBackendControllerTest
   {
     // Given
     UUID chatroomId = UUID.randomUUID();
-    when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+    when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -170,13 +170,11 @@ public class ChatBackendControllerTest
     LocalDateTime timeExistingMessage = LocalDateTime.parse(timeExistingMessageAsString);
     String textExistingMessage = "Existing";
     String textMutatedMessage = "Mutated!";
-    ChatRoom chatRoom = new ChatRoom(
-        chatroomId,
-        "Test-ChatRoom",
-        0,
+    ChatRoomData chatRoomData = new ChatRoomData(
         Clock.systemDefaultZone(),
-        chatRoomService, 8);
-    when(chatHome.getChatRoom(eq(chatroomId))).thenReturn(Mono.just(chatRoom));
+        chatRoomService,
+        8);
+    when(chatHome.getChatRoomData(eq(chatroomId))).thenReturn(Mono.just(chatRoomData));
     Message existingMessage = new Message(
         key,
         serialNumberExistingMessage,
@@ -222,14 +220,12 @@ public class ChatBackendControllerTest
     Long messageId = 66l;
     Message.MessageKey key = Message.MessageKey.of(user, messageId);
     String textMessage = "Hallo Welt";
-    ChatRoom chatRoom = new ChatRoom(
-        chatroomId,
-        "Test-ChatRoom",
-        0,
+    ChatRoomData chatRoomData = new ChatRoomData(
         Clock.systemDefaultZone(),
-        chatRoomService, 8);
-    when(chatHome.getChatRoom(any(UUID.class)))
-        .thenReturn(Mono.just(chatRoom));
+        chatRoomService,
+        8);
+    when(chatHome.getChatRoomData(any(UUID.class)))
+        .thenReturn(Mono.just(chatRoomData));
     when(chatRoomService.getMessage(any(Message.MessageKey.class)))
         .thenReturn(Mono.empty());
     // Needed for readable error-reports, in case of a bug that leads to according unwanted call
@@ -262,7 +258,7 @@ public class ChatBackendControllerTest
     // Given
     UUID chatroomId = UUID.randomUUID();
     int shard = 666;
-    when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHome.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -282,7 +278,7 @@ public class ChatBackendControllerTest
     // Given
     UUID chatroomId = UUID.randomUUID();
     int shard = 666;
-    when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -304,7 +300,7 @@ public class ChatBackendControllerTest
     String username = "foo";
     Long messageId = 66l;
     int shard = 666;
-    when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -331,7 +327,7 @@ public class ChatBackendControllerTest
     String username = "foo";
     Long messageId = 66l;
     int shard = 666;
-    when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -355,7 +351,7 @@ public class ChatBackendControllerTest
     // Given
     UUID chatroomId = UUID.randomUUID();
     int shard = 666;
-    when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
index 2a995ef..5b50314 100644 (file)
@@ -29,8 +29,8 @@ public abstract class ChatHomeTest
     UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
 
     // When
-    Mono<ChatRoom> mono = Mono
-        .defer(() -> chatHome.getChatRoom(chatRoomId))
+    Mono<ChatRoomData> mono = Mono
+        .defer(() -> chatHome.getChatRoomData(chatRoomId))
         .log("testGetExistingChatroom")
         .retryWhen(Retry
             .backoff(5, Duration.ofSeconds(1))
@@ -48,8 +48,8 @@ public abstract class ChatHomeTest
     UUID chatRoomId = UUID.fromString("7f59ec77-832e-4a17-8d22-55ef46242c17");
 
     // When
-    Mono<ChatRoom> mono = Mono
-        .defer(() -> chatHome.getChatRoom(chatRoomId))
+    Mono<ChatRoomData> mono = Mono
+        .defer(() -> chatHome.getChatRoomData(chatRoomId))
         .log("testGetNonExistentChatroom")
         .retryWhen(Retry
             .backoff(5, Duration.ofSeconds(1))
index eebeea1..c65908c 100644 (file)
@@ -26,8 +26,8 @@ public abstract class ChatHomeWithShardsTest extends ChatHomeTest
     UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19");
 
     // When
-    Mono<ChatRoom> mono = Mono
-        .defer(() -> chatHome.getChatRoom(chatRoomId))
+    Mono<ChatRoomData> mono = Mono
+        .defer(() -> chatHome.getChatRoomData(chatRoomId))
         .log("testGetChatroomForNotOwnedShard")
         .retryWhen(Retry
             .backoff(5, Duration.ofSeconds(1))
index 822ffe7..790482f 100644 (file)
@@ -8,13 +8,12 @@ import java.time.Clock;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
-import java.util.UUID;
 
 import static org.mockito.Mockito.*;
 import static pl.rzrz.assertj.reactor.Assertions.assertThat;
 
 
-public class ChatRoomTest
+public class ChatRoomDataTest
 {
   @Test
   @DisplayName("Assert, that Mono emits expected message, if it exists")
@@ -24,10 +23,7 @@ public class ChatRoomTest
     String user = "foo";
     Long messageId = 1l;
     ChatRoomService chatRoomService = mock(ChatRoomService.class);
-    ChatRoom chatRoom = new ChatRoom(
-        UUID.randomUUID(),
-        "Foo",
-        0,
+    ChatRoomData chatRoomData = new ChatRoomData(
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
@@ -37,7 +33,7 @@ public class ChatRoomTest
     when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.just(message));
 
     // When
-    Mono<Message> mono = chatRoom.getMessage(user, messageId);
+    Mono<Message> mono = chatRoomData.getMessage(user, messageId);
 
     // Then
     assertThat(mono).emitsExactly(message);
@@ -51,17 +47,14 @@ public class ChatRoomTest
     String user = "foo";
     Long messageId = 1l;
     ChatRoomService chatRoomService = mock(ChatRoomService.class);
-    ChatRoom chatRoom = new ChatRoom(
-        UUID.randomUUID(),
-        "Foo",
-        0,
+    ChatRoomData chatRoomData = new ChatRoomData(
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
     when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.empty());
 
     // When
-    Mono<Message> mono = chatRoom.getMessage(user, messageId);
+    Mono<Message> mono = chatRoomData.getMessage(user, messageId);
 
     // Then
     assertThat(mono).emitsCount(0);
@@ -75,10 +68,7 @@ public class ChatRoomTest
     String user = "foo";
     Long messageId = 1l;
     ChatRoomService chatRoomService = mock(ChatRoomService.class);
-    ChatRoom chatRoom = new ChatRoom(
-        UUID.randomUUID(),
-        "Foo",
-        0,
+    ChatRoomData chatRoomData = new ChatRoomData(
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
@@ -91,7 +81,7 @@ public class ChatRoomTest
     when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message));
 
     // When
-    Mono<Message> mono = chatRoom.addMessage(messageId, user, messageText);
+    Mono<Message> mono = chatRoomData.addMessage(messageId, user, messageText);
 
     // Then
     assertThat(mono).emitsExactly(message);
@@ -105,10 +95,7 @@ public class ChatRoomTest
     String user = "foo";
     Long messageId = 1l;
     ChatRoomService chatRoomService = mock(ChatRoomService.class);
-    ChatRoom chatRoom = new ChatRoom(
-        UUID.randomUUID(),
-        "Foo",
-        0,
+    ChatRoomData chatRoomData = new ChatRoomData(
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
@@ -121,7 +108,7 @@ public class ChatRoomTest
     when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message));
 
     // When
-    Mono<Message> mono = chatRoom.addMessage(messageId, user, messageText);
+    Mono<Message> mono = chatRoomData.addMessage(messageId, user, messageText);
 
     // Then
     assertThat(mono).emitsExactly(message);
@@ -135,10 +122,7 @@ public class ChatRoomTest
     String user = "foo";
     Long messageId = 1l;
     ChatRoomService chatRoomService = mock(ChatRoomService.class);
-    ChatRoom chatRoom = new ChatRoom(
-        UUID.randomUUID(),
-        "Foo",
-        0,
+    ChatRoomData chatRoomData = new ChatRoomData(
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
@@ -152,7 +136,7 @@ public class ChatRoomTest
     when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message));
 
     // When
-    Mono<Message> mono = chatRoom.addMessage(messageId, user, mutatedText);
+    Mono<Message> mono = chatRoomData.addMessage(messageId, user, mutatedText);
 
     // Then
     assertThat(mono).sendsError();
index 62dc08a..142f709 100644 (file)
@@ -22,7 +22,7 @@ public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyI
       int bufferSize = 8;
 
       SimpleChatHome simpleChatHome = new SimpleChatHome(
-          getStorageStrategy().read(),
+          getStorageStrategy(),
           clock,
           bufferSize);
 
index 6936532..e619649 100644 (file)
@@ -27,7 +27,7 @@ public abstract class AbstractStorageStrategyIT
 
   protected void stop()
   {
-    getStorageStrategy().write(chathome.getChatRooms());
+    getStorageStrategy().write(chathome);
   }
 
   @Test
@@ -35,31 +35,30 @@ public abstract class AbstractStorageStrategyIT
   {
     start();
 
-    assertThat(chathome.getChatRooms().toStream()).hasSize(0);
+    assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0);
 
     UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
     ChatRoomInfo info = chathome.createChatRoom(chatRoomId, "FOO").block();
     log.debug("Created chat-room {}", info);
-    ChatRoom chatroom = chathome.getChatRoom(chatRoomId).block();
+    ChatRoomData chatroom = chathome.getChatRoomData(chatRoomId).block();
     Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
     Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block();
     Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
     Message m4 = chatroom.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block();
 
-
-    assertThat(chathome.getChatRooms().toStream()).containsExactlyElementsOf(List.of(chatroom));
-    assertThat(chathome.getChatRoom(chatRoomId)).emitsExactly(chatroom);
+    assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info));
+    assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info);
     assertThat(chathome
-        .getChatRoom(chatRoomId)
+        .getChatRoomData(chatRoomId)
         .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4);
 
     stop();
     start();
 
-    assertThat(chathome.getChatRooms().toStream()).containsExactlyElementsOf(List.of(chatroom));
-    assertThat(chathome.getChatRoom(chatRoomId)).emitsExactly(chatroom);
+    assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info));
+    assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info);
     assertThat(chathome
-        .getChatRoom(chatRoomId)
+        .getChatRoomData(chatRoomId)
         .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4);
   }
 
@@ -68,12 +67,12 @@ public abstract class AbstractStorageStrategyIT
   {
     start();
 
-    assertThat(chathome.getChatRooms().toStream()).hasSize(0);
+    assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0);
 
     UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
     ChatRoomInfo infoA = chathome.createChatRoom(chatRoomAId, "FOO").block();
     log.debug("Created chat-room {}", infoA);
-    ChatRoom chatroomA = chathome.getChatRoom(chatRoomAId).block();
+    ChatRoomData chatroomA = chathome.getChatRoomData(chatRoomAId).block();
     Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
     Message ma2 = chatroomA.addMessage(1l, "ute", "Ich bin Ute...").block();
     Message ma3 = chatroomA.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
@@ -82,33 +81,33 @@ public abstract class AbstractStorageStrategyIT
     UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea");
     ChatRoomInfo infoB = chathome.createChatRoom(chatRoomBId, "BAR").block();
     log.debug("Created chat-room {}", infoB);
-    ChatRoom chatroomB = chathome.getChatRoom(chatRoomBId).block();
+    ChatRoomData chatroomB = chathome.getChatRoomData(chatRoomBId).block();
     Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block();
     Message mb2 = chatroomB.addMessage(1l, "ute", "Ich bin Ute...").block();
     Message mb3 = chatroomB.addMessage(1l, "klaus", "Willst du mit mir gehen?").block();
     Message mb4 = chatroomB.addMessage(2l, "peter", "Hä? Was jetzt?!? Isch glohb isch höb ühn däjah vüh...").block();
 
-    assertThat(chathome.getChatRooms().toStream()).containsExactlyInAnyOrderElementsOf(List.of(chatroomA, chatroomB));
-    assertThat(chathome.getChatRoom(chatRoomAId)).emitsExactly(chatroomA);
+    assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB));
+    assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA);
     assertThat(chathome
-        .getChatRoom(chatRoomAId)
+        .getChatRoomData(chatRoomAId)
         .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4);
-    assertThat(chathome.getChatRoom(chatRoomBId)).emitsExactly(chatroomB);
+    assertThat(chathome.getChatRoomData(chatRoomBId)).emitsExactly(chatroomB);
     assertThat(chathome
-        .getChatRoom(chatRoomBId)
+        .getChatRoomData(chatRoomBId)
         .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4);
 
     stop();
     start();
 
-    assertThat(chathome.getChatRooms().toStream()).containsExactlyInAnyOrderElementsOf(List.of(chatroomA, chatroomB));
-    assertThat(chathome.getChatRoom(chatRoomAId)).emitsExactly(chatroomA);
+    assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB));
+    assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA);
     assertThat(chathome
-        .getChatRoom(chatRoomAId)
+        .getChatRoomData(chatRoomAId)
         .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4);
-    assertThat(chathome.getChatRoom(chatRoomBId)).emitsExactly(chatroomB);
+    assertThat(chathome.getChatRoomInfo(chatRoomBId)).emitsExactly(infoB);
     assertThat(chathome
-        .getChatRoom(chatRoomBId)
+        .getChatRoomData(chatRoomBId)
         .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4);
   }
 
index 1bb0870..be40eed 100644 (file)
@@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.BeforeEach;
 
@@ -32,10 +31,7 @@ public class InMemoryWithFilesStorageIT extends AbstractInMemoryStorageIT
     mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
     storageStrategy = new FilesStorageStrategy(
         path,
-        clock,
-        8,
         chatRoomId -> 0,
-        messageFlux -> new InMemoryChatRoomService(messageFlux),
         mapper);
   }
 
index 7ca9cb2..a0dab37 100644 (file)
@@ -1,8 +1,8 @@
 package de.juplo.kafka.chat.backend.persistence;
 
 import de.juplo.kafka.chat.backend.persistence.InMemoryWithMongoDbStorageIT.DataSourceInitializer;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomRepository;
+import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageRepository;
 import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MongoDbStorageStrategy;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.BeforeEach;
@@ -36,9 +36,9 @@ public class InMemoryWithMongoDbStorageIT extends AbstractInMemoryStorageIT
   @Autowired
   MongoDbStorageStrategy storageStrategy;
   @Autowired
-  ChatRoomRepository repository;
+  ChatRoomRepository chatRoomRepository;
   @Autowired
-  Clock clock;
+  MessageRepository messageRepository;
 
 
   public InMemoryWithMongoDbStorageIT()
@@ -59,14 +59,13 @@ public class InMemoryWithMongoDbStorageIT extends AbstractInMemoryStorageIT
     @Bean
     MongoDbStorageStrategy storageStrategy(
         ChatRoomRepository chatRoomRepository,
+        MessageRepository messageRepository,
         Clock clock)
     {
       return new MongoDbStorageStrategy(
           chatRoomRepository,
-          clock,
-          8,
-          chatRoomId -> 0,
-          messageFlux -> new InMemoryChatRoomService(messageFlux));
+          messageRepository,
+          chatRoomId -> 0);
     }
 
     @Bean
@@ -102,6 +101,7 @@ public class InMemoryWithMongoDbStorageIT extends AbstractInMemoryStorageIT
   {
     Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log);
     CONTAINER.followOutput(logConsumer);
-    repository.deleteAll();
+    chatRoomRepository.deleteAll();
+    messageRepository.deleteAll();
   }
 }
index e2ffd3a..7da3b15 100644 (file)
@@ -27,7 +27,7 @@ public class ShardedChatHomeTest extends ChatHomeWithShardsTest
           .of(ownedShards())
           .forEach(shard -> chatHomes[shard] = new SimpleChatHome(
               shard,
-              storageStrategy.read(),
+              storageStrategy,
               clock,
               bufferSize()));
 
@@ -41,10 +41,7 @@ public class ShardedChatHomeTest extends ChatHomeWithShardsTest
     {
       return new FilesStorageStrategy(
           Paths.get("target", "test-classes", "data", "files"),
-          clock,
-          bufferSize(),
           new KafkaLikeShardingStrategy(NUM_SHARDS),
-          messageFlux -> new InMemoryChatRoomService(messageFlux),
           new ObjectMapper());
     }
 
index 190d0f2..8be3173 100644 (file)
@@ -22,7 +22,7 @@ public class SimpleChatHomeTest extends ChatHomeTest
         Clock clock)
     {
       return new SimpleChatHome(
-          storageStrategy.read(),
+          storageStrategy,
           clock,
           bufferSize());
     }
@@ -32,10 +32,7 @@ public class SimpleChatHomeTest extends ChatHomeTest
     {
       return new FilesStorageStrategy(
           Paths.get("target", "test-classes", "data", "files"),
-          clock,
-          bufferSize(),
           chatRoomId -> 0,
-          messageFlux -> new InMemoryChatRoomService(messageFlux),
           new ObjectMapper());
     }
 
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java
deleted file mode 100644 (file)
index 33a8a50..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import org.junit.jupiter.api.Test;
-
-import java.time.LocalDateTime;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-public class MessageToTest
-{
-  @Test
-  void testFrom()
-  {
-    Message message = new Message(
-        Message.MessageKey.of("ute", 1l),
-        6l,
-        LocalDateTime.now(),
-        "foo");
-    MessageTo messageTo = MessageTo.from(message);
-    assertThat(messageTo.getId()).isEqualTo("ute--1");
-  }
-
-  @Test
-  void testToMessage()
-  {
-    MessageTo messageTo = new MessageTo(
-        "ute--1",
-        6l,
-        LocalDateTime.now().toString(),
-        "foo");
-    Message message = messageTo.toMessage();
-    assertThat(message.getId()).isEqualTo(1l);
-    assertThat(message.getUsername()).isEqualTo("ute");
-  }
-}
index 014a8be..ec4ebbf 100644 (file)
@@ -1,2 +1,3 @@
 #!/bin/bash
 mongoimport --collection=chatRoomTo --db=test /docker-entrypoint-initdb.d/chatRoomTo.json
+mongoimport --collection=messageTo --db=test /docker-entrypoint-initdb.d/messageTo.json
index ae15034..db26b89 100644 (file)
@@ -1,31 +1,6 @@
 {
   "_id": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
+  "shard": 0,
   "name": "FOO",
-  "messages": [
-    {
-      "_id": "peter--1",
-      "serial": 0,
-      "time": "2023-01-13T20:43:16.803382151",
-      "text": "Hallo, ich heiße Peter!"
-    },
-    {
-      "_id": "ute--1",
-      "serial": 1,
-      "time": "2023-01-13T20:43:16.804049969",
-      "text": "Ich bin Ute..."
-    },
-    {
-      "_id": "peter--2",
-      "serial": 2,
-      "time": "2023-01-13T20:43:16.804092782",
-      "text": "Willst du mit mir gehen?"
-    },
-    {
-      "_id": "klaus--1",
-      "serial": 3,
-      "time": "2023-01-13T20:43:16.804122604",
-      "text": "Ja? Nein? Vielleicht??"
-    }
-  ],
   "_class": "de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomTo"
 }
diff --git a/src/test/resources/data/mongodb/messageTo.json b/src/test/resources/data/mongodb/messageTo.json
new file mode 100644 (file)
index 0000000..02fed6f
--- /dev/null
@@ -0,0 +1,40 @@
+{
+  "_id": "64f7952cecf06d750cad4b9c",
+  "chatRoomId": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
+  "user": "peter",
+  "id": 1,
+  "serial": 0,
+  "time": "2023-01-13T20:43:16.803382151",
+  "text": "Hallo, ich heiße Peter!",
+  "_class": "de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageTo"
+}
+{
+  "_id": "64f7952cecf06d750cad4b9d",
+  "chatRoomId": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
+  "user": "ute",
+  "id": 1,
+  "serial": 1,
+  "time": "2023-01-13T20:43:16.804049969",
+  "text": "Ich bin Ute...",
+  "_class": "de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageTo"
+}
+{
+  "_id": "64f7952cecf06d750cad4b9e",
+  "chatRoomId": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
+  "user": "peter",
+  "id": 2,
+  "serial": 2,
+  "time": "2023-01-13T20:43:16.804092782",
+  "text": "Willst du mit mir gehen?",
+  "_class": "de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageTo"
+}
+{
+  "_id": "64f7953ddf7a9063e0b1f7dc",
+  "chatRoomId": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
+  "user": "klaus",
+  "id": 1,
+  "serial": 3,
+  "time": "2023-01-13T20:43:16.804122604",
+  "text": "Ja? Nein? Vielleicht??",
+  "_class": "de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageTo"
+}