feat: Reintroduced `ChatRoom.shard`, becaus it is needed as a routing-hint
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / storage / mongodb / MongoDbStorageStrategy.java
index 8429fe8..d21fe2b 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
@@ -18,6 +19,7 @@ public class MongoDbStorageStrategy implements StorageStrategy
   private final ChatRoomRepository repository;
   private final Clock clock;
   private final int bufferSize;
+  private final ShardingStrategy shardingStrategy;
   private final ChatRoomServiceFactory factory;
 
 
@@ -34,15 +36,20 @@ public class MongoDbStorageStrategy implements StorageStrategy
   {
     return Flux
         .fromIterable(repository.findAll())
-        .map(chatRoomTo -> new ChatRoom(
-            UUID.fromString(chatRoomTo.getId()),
-            chatRoomTo.getName(),
-            chatRoomTo.getShard(),
-            clock,
-            factory.create(
-                Flux
-                    .fromIterable(chatRoomTo.getMessages())
-                    .map(messageTo -> messageTo.toMessage())),
-            bufferSize));
+        .map(chatRoomTo ->
+        {
+          UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
+          int shard = shardingStrategy.selectShard(chatRoomId);
+          return new ChatRoom(
+              chatRoomId,
+              chatRoomTo.getName(),
+              shard,
+              clock,
+              factory.create(
+                  Flux
+                      .fromIterable(chatRoomTo.getMessages())
+                      .map(messageTo -> messageTo.toMessage())),
+              bufferSize);
+        });
   }
 }