refactor: One stream -> using `flatMap` instead of an inner `subscribe`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / ShardedChatHomeService.java
index 06c197b..9a38453 100644 (file)
@@ -8,6 +8,8 @@ import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.net.URI;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
@@ -17,27 +19,31 @@ import java.util.stream.Collectors;
 @Slf4j
 public class ShardedChatHomeService implements ChatHomeService
 {
+  private final String instanceId;
   private final SimpleChatHomeService[] chatHomes;
   private final Set<Integer> ownedShards;
+  private final String[] shardOwners;
   private final ShardingStrategy shardingStrategy;
 
 
   public ShardedChatHomeService(
+      String instanceId,
       SimpleChatHomeService[] chatHomes,
+      URI[] shardOwners,
       ShardingStrategy shardingStrategy)
   {
+    this.instanceId = instanceId;
     this.chatHomes = chatHomes;
+    this.shardOwners = Arrays
+        .stream(shardOwners)
+        .map(uri -> uri.toASCIIString())
+        .toArray(size -> new String[size]);
     this.shardingStrategy = shardingStrategy;
     this.ownedShards = new HashSet<>();
     for (int shard = 0; shard < chatHomes.length; shard++)
       if(chatHomes[shard] != null)
         this.ownedShards.add(shard);
-    log.info(
-        "Created ShardedChatHome for shards: {}",
-        ownedShards
-            .stream()
-            .map(String::valueOf)
-            .collect(Collectors.joining(", ")));
+    log.info("Created {}", this);
   }
 
 
@@ -46,7 +52,7 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     int shard = shardingStrategy.selectShard(id);
     return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
+        ? Mono.error(new ShardNotOwnedException(instanceId, shard))
         : chatHomes[shard].createChatRoom(id, name);
   }
 
@@ -55,7 +61,7 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     int shard = selectShard(id);
     return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
+        ? Mono.error(new ShardNotOwnedException(instanceId, shard))
         : chatHomes[shard]
             .getChatRoomInfo(id)
             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
@@ -79,7 +85,7 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     int shard = selectShard(id);
     return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
+        ? Mono.error(new ShardNotOwnedException(instanceId, shard))
         : chatHomes[shard]
             .getChatRoomData(id)
             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
@@ -90,8 +96,28 @@ public class ShardedChatHomeService implements ChatHomeService
                 : throwable);
   }
 
+  @Override
+  public Mono<String[]> getShardOwners()
+  {
+    return Mono.just(shardOwners);
+  }
+
   private int selectShard(UUID chatroomId)
   {
     return shardingStrategy.selectShard(chatroomId);
   }
+
+  @Override
+  public String toString()
+  {
+    StringBuffer stringBuffer = new StringBuffer(ShardedChatHomeService.class.getSimpleName());
+    stringBuffer.append(", shards=[");
+    stringBuffer.append(ownedShards
+        .stream()
+        .sorted()
+        .map(String::valueOf)
+        .collect(Collectors.joining(",")));
+    stringBuffer.append("]");
+    return stringBuffer.toString();
+  }
 }