TMP:Holzweg so, Refaktorisierung nötig
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / storage / mongodb / MongoDbStorageStrategy.java
index 2300219..8318c6c 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
@@ -18,11 +19,12 @@ public class MongoDbStorageStrategy implements StorageStrategy
   private final ChatRoomRepository repository;
   private final Clock clock;
   private final int bufferSize;
+  private final ShardingStrategy shardingStrategy;
   private final ChatRoomServiceFactory factory;
 
 
   @Override
-  public void write(Flux<ChatRoom> chatroomFlux)
+  public void write(Flux<ChatRoomInfo> chatroomFlux)
   {
     chatroomFlux
         .map(ChatRoomTo::from)
@@ -37,9 +39,11 @@ public class MongoDbStorageStrategy implements StorageStrategy
         .map(chatRoomTo ->
         {
           UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
+          int shard = shardingStrategy.selectShard(chatRoomId);
           return new ChatRoom(
               chatRoomId,
               chatRoomTo.getName(),
+              shard,
               clock,
               factory.create(
                   Flux