feat: Implemented and tested `MongoDbStorageStrategy`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryChatHomeService.java
index 4eea645..96515bf 100644 (file)
@@ -28,7 +28,7 @@ public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoom
   {
     log.debug("Creating InMemoryChatHomeService with buffer-size {} (for created ChatRoom's)", bufferSize);
     this.chatrooms = new HashMap<>();
-    chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
+    chatroomFlux.toStream().forEach(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
     this.clock = clock;
     this.bufferSize = bufferSize;
   }