WIP: shard assigned/revoked events
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / SimpleChatHomeService.java
index da01a54..cf6d20a 100644 (file)
@@ -73,7 +73,7 @@ public class SimpleChatHomeService implements ChatHomeService
               info.getId(),
               new ChatRoomData(
                   clock,
-                  new InMemoryChatRoomService(messageFlux),
+                  new InMemoryChatMessageService(messageFlux),
                   bufferSize));
         });
     this.clock = clock;
@@ -85,7 +85,7 @@ public class SimpleChatHomeService implements ChatHomeService
   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
   {
     log.info("Creating ChatRoom with buffer-size {}", bufferSize);
-    ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
+    ChatMessageService service = new InMemoryChatMessageService(Flux.empty());
     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
     this.chatRoomInfo.put(id, chatRoomInfo);
     ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
@@ -114,4 +114,10 @@ public class SimpleChatHomeService implements ChatHomeService
         .justOrEmpty(chatRoomData.get(id))
         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
   }
+
+  @Override
+  public Mono<String[]> getShardOwners()
+  {
+    return Mono.empty();
+  }
 }