TMP:Holzweg so, Refaktorisierung nötig
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / storage / mongodb / MongoDbStorageStrategy.java
index 8429fe8..8318c6c 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
@@ -18,11 +19,12 @@ public class MongoDbStorageStrategy implements StorageStrategy
   private final ChatRoomRepository repository;
   private final Clock clock;
   private final int bufferSize;
+  private final ShardingStrategy shardingStrategy;
   private final ChatRoomServiceFactory factory;
 
 
   @Override
-  public void write(Flux<ChatRoom> chatroomFlux)
+  public void write(Flux<ChatRoomInfo> chatroomFlux)
   {
     chatroomFlux
         .map(ChatRoomTo::from)
@@ -34,15 +36,20 @@ public class MongoDbStorageStrategy implements StorageStrategy
   {
     return Flux
         .fromIterable(repository.findAll())
-        .map(chatRoomTo -> new ChatRoom(
-            UUID.fromString(chatRoomTo.getId()),
-            chatRoomTo.getName(),
-            chatRoomTo.getShard(),
-            clock,
-            factory.create(
-                Flux
-                    .fromIterable(chatRoomTo.getMessages())
-                    .map(messageTo -> messageTo.toMessage())),
-            bufferSize));
+        .map(chatRoomTo ->
+        {
+          UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
+          int shard = shardingStrategy.selectShard(chatRoomId);
+          return new ChatRoom(
+              chatRoomId,
+              chatRoomTo.getName(),
+              shard,
+              clock,
+              factory.create(
+                  Flux
+                      .fromIterable(chatRoomTo.getMessages())
+                      .map(messageTo -> messageTo.toMessage())),
+              bufferSize);
+        });
   }
 }