WIP:haproxy
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / ShardedChatHomeService.java
index 09e4684..ab7f8d4 100644 (file)
@@ -1,13 +1,15 @@
-package de.juplo.kafka.chat.backend.persistence.inmemory;
+package de.juplo.kafka.chat.backend.implementation.inmemory;
 
 import de.juplo.kafka.chat.backend.domain.*;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.net.URI;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
@@ -17,16 +19,25 @@ import java.util.stream.Collectors;
 @Slf4j
 public class ShardedChatHomeService implements ChatHomeService
 {
+  private final String instanceId;
   private final SimpleChatHomeService[] chatHomes;
   private final Set<Integer> ownedShards;
+  private final String[] shardOwners;
   private final ShardingStrategy shardingStrategy;
 
 
   public ShardedChatHomeService(
+      String instanceId,
       SimpleChatHomeService[] chatHomes,
+      URI[] shardOwners,
       ShardingStrategy shardingStrategy)
   {
+    this.instanceId = instanceId;
     this.chatHomes = chatHomes;
+    this.shardOwners = Arrays
+        .stream(shardOwners)
+        .map(uri -> uri.toASCIIString())
+        .toArray(size -> new String[size]);
     this.shardingStrategy = shardingStrategy;
     this.ownedShards = new HashSet<>();
     for (int shard = 0; shard < chatHomes.length; shard++)
@@ -46,7 +57,7 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     int shard = shardingStrategy.selectShard(id);
     return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
+        ? Mono.error(new ShardNotOwnedException(instanceId, shard))
         : chatHomes[shard].createChatRoom(id, name);
   }
 
@@ -55,7 +66,7 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     int shard = selectShard(id);
     return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
+        ? Mono.error(new ShardNotOwnedException(instanceId, shard))
         : chatHomes[shard]
             .getChatRoomInfo(id)
             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
@@ -79,7 +90,7 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     int shard = selectShard(id);
     return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
+        ? Mono.error(new ShardNotOwnedException(instanceId, shard))
         : chatHomes[shard]
             .getChatRoomData(id)
             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
@@ -90,15 +101,12 @@ public class ShardedChatHomeService implements ChatHomeService
                 : throwable);
   }
 
-  public Flux<ChatRoomData> getChatRoomData()
+  @Override
+  public Mono<String[]> getShardOwners()
   {
-    return Flux
-        .fromIterable(ownedShards)
-        .flatMap(shard -> chatHomes[shard].getChatRoomData());
+    return Mono.just(shardOwners);
   }
 
-
-
   private int selectShard(UUID chatroomId)
   {
     return shardingStrategy.selectShard(chatroomId);