refactore: Renamed `PersistenceStrategy` to `ChatroomService` -- Rename
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ChatHome.java
index 84b9e94..ed4d8c7 100644 (file)
@@ -1,16 +1,25 @@
 package de.juplo.kafka.chat.backend.domain;
 
-import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
 
 import java.util.*;
+import java.util.stream.Stream;
 
 
-@RequiredArgsConstructor
+@Slf4j
 public class ChatHome
 {
-  private final Map<UUID, Chatroom> chatrooms = new HashMap<>();
+  private final Map<UUID, Chatroom> chatrooms;
   private final ChatroomFactory factory;
 
+  public ChatHome(ChatroomFactory factory, Flux<Chatroom> chatroomFlux)
+  {
+    log.debug("Creating ChatHome with factory: {}", factory);
+    this.factory = factory;
+    this.chatrooms = new HashMap<>();
+    chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
+  }
 
   public Chatroom createChatroom(String name)
   {
@@ -24,8 +33,8 @@ public class ChatHome
     return Optional.ofNullable(chatrooms.get(id));
   }
 
-  public Collection<Chatroom> list()
+  public Stream<Chatroom> list()
   {
-    return chatrooms.values();
+    return chatrooms.values().stream();
   }
 }