WIP: shard assigned/revoked events
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / ShardedChatHomeService.java
index 06c197b..c281d9e 100644 (file)
@@ -8,6 +8,8 @@ import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.net.URI;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
@@ -19,14 +21,20 @@ public class ShardedChatHomeService implements ChatHomeService
 {
   private final SimpleChatHomeService[] chatHomes;
   private final Set<Integer> ownedShards;
+  private final String[] shardOwners;
   private final ShardingStrategy shardingStrategy;
 
 
   public ShardedChatHomeService(
       SimpleChatHomeService[] chatHomes,
+      URI[] shardOwners,
       ShardingStrategy shardingStrategy)
   {
     this.chatHomes = chatHomes;
+    this.shardOwners = Arrays
+        .stream(shardOwners)
+        .map(uri -> uri.toASCIIString())
+        .toArray(size -> new String[size]);
     this.shardingStrategy = shardingStrategy;
     this.ownedShards = new HashSet<>();
     for (int shard = 0; shard < chatHomes.length; shard++)
@@ -90,6 +98,12 @@ public class ShardedChatHomeService implements ChatHomeService
                 : throwable);
   }
 
+  @Override
+  public Mono<String[]> getShardOwners()
+  {
+    return Mono.just(shardOwners);
+  }
+
   private int selectShard(UUID chatroomId)
   {
     return shardingStrategy.selectShard(chatroomId);