WIP: shard assigned/revoked events
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaChatHomeService.java
index 5019ed2..9409716 100644 (file)
@@ -18,45 +18,53 @@ import java.util.*;
 public class KafkaChatHomeService implements ChatHomeService
 {
   private final int numPartitions;
-  private final ChatRoomChannel chatRoomChannel;
+  private final InfoChannel infoChannel;
+  private final DataChannel dataChannel;
 
 
 
   @Override
   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
   {
-    log.info("Sending create-command for chat rooom: id={}, name={}");
-    return chatRoomChannel.sendCreateChatRoomRequest(id, name);
+    int shard = selectShard(id);
+    log.info(
+        "Sending create-command for chat rooom: id={}, name={}, shard={}",
+        id,
+        name,
+        shard);
+    return infoChannel.sendChatRoomCreatedEvent(id, name, shard);
   }
 
   @Override
   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
   {
-    int shard = selectShard(id);
-    return chatRoomChannel
-        .getChatRoomInfo(shard, id)
-        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
-            id,
-            shard,
-            chatRoomChannel.getOwnedShards())));
+    return infoChannel
+        .getChatRoomInfo(id)
+        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
   }
 
   @Override
   public Flux<ChatRoomInfo> getChatRoomInfo()
   {
-    return chatRoomChannel.getChatRoomInfo();
+    return infoChannel.getChatRoomInfo();
   }
 
   @Override
   public Mono<ChatRoomData> getChatRoomData(UUID id)
   {
     int shard = selectShard(id);
-    return chatRoomChannel
+    return dataChannel
         .getChatRoomData(shard, id)
         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
             id,
             shard,
-            chatRoomChannel.getOwnedShards())));
+            dataChannel.getOwnedShards())));
+  }
+
+  @Override
+  public Mono<String[]> getShardOwners()
+  {
+    return infoChannel.getShardOwners();
   }
 
   int selectShard(UUID chatRoomId)