refactor: One stream -> using `flatMap` instead of an inner `subscribe`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / ShardedChatHomeService.java
index c281d9e..9a38453 100644 (file)
@@ -19,6 +19,7 @@ import java.util.stream.Collectors;
 @Slf4j
 public class ShardedChatHomeService implements ChatHomeService
 {
+  private final String instanceId;
   private final SimpleChatHomeService[] chatHomes;
   private final Set<Integer> ownedShards;
   private final String[] shardOwners;
@@ -26,10 +27,12 @@ public class ShardedChatHomeService implements ChatHomeService
 
 
   public ShardedChatHomeService(
+      String instanceId,
       SimpleChatHomeService[] chatHomes,
       URI[] shardOwners,
       ShardingStrategy shardingStrategy)
   {
+    this.instanceId = instanceId;
     this.chatHomes = chatHomes;
     this.shardOwners = Arrays
         .stream(shardOwners)
@@ -40,12 +43,7 @@ public class ShardedChatHomeService implements ChatHomeService
     for (int shard = 0; shard < chatHomes.length; shard++)
       if(chatHomes[shard] != null)
         this.ownedShards.add(shard);
-    log.info(
-        "Created ShardedChatHome for shards: {}",
-        ownedShards
-            .stream()
-            .map(String::valueOf)
-            .collect(Collectors.joining(", ")));
+    log.info("Created {}", this);
   }
 
 
@@ -54,7 +52,7 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     int shard = shardingStrategy.selectShard(id);
     return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
+        ? Mono.error(new ShardNotOwnedException(instanceId, shard))
         : chatHomes[shard].createChatRoom(id, name);
   }
 
@@ -63,7 +61,7 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     int shard = selectShard(id);
     return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
+        ? Mono.error(new ShardNotOwnedException(instanceId, shard))
         : chatHomes[shard]
             .getChatRoomInfo(id)
             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
@@ -87,7 +85,7 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     int shard = selectShard(id);
     return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
+        ? Mono.error(new ShardNotOwnedException(instanceId, shard))
         : chatHomes[shard]
             .getChatRoomData(id)
             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
@@ -108,4 +106,18 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     return shardingStrategy.selectShard(chatroomId);
   }
+
+  @Override
+  public String toString()
+  {
+    StringBuffer stringBuffer = new StringBuffer(ShardedChatHomeService.class.getSimpleName());
+    stringBuffer.append(", shards=[");
+    stringBuffer.append(ownedShards
+        .stream()
+        .sorted()
+        .map(String::valueOf)
+        .collect(Collectors.joining(",")));
+    stringBuffer.append("]");
+    return stringBuffer.toString();
+  }
 }