feat: Reintroduced `ChatRoom.shard`, becaus it is needed as a routing-hint
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / storage / mongodb / MongoDbStorageStrategy.java
index 2300219..d21fe2b 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
@@ -18,6 +19,7 @@ public class MongoDbStorageStrategy implements StorageStrategy
   private final ChatRoomRepository repository;
   private final Clock clock;
   private final int bufferSize;
+  private final ShardingStrategy shardingStrategy;
   private final ChatRoomServiceFactory factory;
 
 
@@ -37,9 +39,11 @@ public class MongoDbStorageStrategy implements StorageStrategy
         .map(chatRoomTo ->
         {
           UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
+          int shard = shardingStrategy.selectShard(chatRoomId);
           return new ChatRoom(
               chatRoomId,
               chatRoomTo.getName(),
+              shard,
               clock,
               factory.create(
                   Flux