WIP: shard assigned/revoked events
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / SimpleChatHomeService.java
index 61b36ee..cf6d20a 100644 (file)
@@ -73,7 +73,7 @@ public class SimpleChatHomeService implements ChatHomeService
               info.getId(),
               new ChatRoomData(
                   clock,
-                  new InMemoryChatRoomService(messageFlux),
+                  new InMemoryChatMessageService(messageFlux),
                   bufferSize));
         });
     this.clock = clock;
@@ -85,7 +85,7 @@ public class SimpleChatHomeService implements ChatHomeService
   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
   {
     log.info("Creating ChatRoom with buffer-size {}", bufferSize);
-    ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
+    ChatMessageService service = new InMemoryChatMessageService(Flux.empty());
     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
     this.chatRoomInfo.put(id, chatRoomInfo);
     ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
@@ -115,8 +115,9 @@ public class SimpleChatHomeService implements ChatHomeService
         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
   }
 
-  public Flux<ChatRoomData> getChatRoomData()
+  @Override
+  public Mono<String[]> getShardOwners()
   {
-    return Flux.fromIterable(chatRoomData.values());
+    return Mono.empty();
   }
 }