refactor: The `ChatRoom` determines the timestamp of a `Message`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / api / ChatBackendController.java
index 385c95c..cc6958f 100644 (file)
@@ -1,17 +1,17 @@
 package de.juplo.kafka.chat.backend.api;
 
 import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.Chatroom;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import lombok.RequiredArgsConstructor;
-import org.springframework.http.MediaType;
+import org.springframework.http.codec.ServerSentEvent;
 import org.springframework.web.bind.annotation.*;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import java.time.Clock;
-import java.time.LocalDateTime;
-import java.util.Collection;
+import java.util.Optional;
 import java.util.UUID;
+import java.util.stream.Stream;
 
 
 @RestController
@@ -19,25 +19,36 @@ import java.util.UUID;
 public class ChatBackendController
 {
   private final ChatHome chatHome;
-  private final Clock clock;
+  private final StorageStrategy storageStrategy;
 
 
   @PostMapping("create")
-  public Chatroom create(@RequestBody String name)
+  public ChatRoomTo create(@RequestBody String name)
   {
-    return chatHome.createChatroom(name);
+    return ChatRoomTo.from(chatHome.createChatroom(name));
   }
 
   @GetMapping("list")
-  public Collection<Chatroom> list()
+  public Stream<ChatRoomTo> list()
   {
-    return chatHome.list();
+    return chatHome.list().map(chatroom -> ChatRoomTo.from(chatroom));
+  }
+
+  @GetMapping("list/{chatroomId}")
+  public Flux<MessageTo> list(@PathVariable UUID chatroomId)
+  {
+    return chatHome
+        .getChatroom(chatroomId)
+        .map(chatroom -> chatroom
+            .getMessages()
+            .map(MessageTo::from))
+        .get();
   }
 
   @GetMapping("get/{chatroomId}")
-  public Chatroom get(@PathVariable UUID chatroomId)
+  public Optional<ChatRoomTo> get(@PathVariable UUID chatroomId)
   {
-    return chatHome.getChatroom(chatroomId);
+    return chatHome.getChatroom(chatroomId).map(chatroom -> ChatRoomTo.from(chatroom));
   }
 
   @PutMapping("put/{chatroomId}/{username}/{messageId}")
@@ -47,12 +58,23 @@ public class ChatBackendController
       @PathVariable Long messageId,
       @RequestBody String text)
   {
-    Chatroom chatroom = chatHome.getChatroom(chatroomId);
+    return
+        chatHome
+            .getChatroom(chatroomId)
+            .map(chatroom -> put(chatroom, username, messageId, text))
+            .orElseThrow(() -> new UnknownChatroomException(chatroomId));
+  }
+
+  public Mono<MessageTo> put(
+      ChatRoom chatroom,
+      String username,
+      Long messageId,
+      String text)
+  {
     return
         chatroom
             .addMessage(
                 messageId,
-                LocalDateTime.now(clock),
                 username,
                 text)
             .switchIfEmpty(chatroom.getMessage(username, messageId))
@@ -68,19 +90,47 @@ public class ChatBackendController
     return
         chatHome
             .getChatroom(chatroomId)
+            .map(chatroom -> get(chatroom, username, messageId))
+            .orElseThrow(() -> new UnknownChatroomException(chatroomId));
+  }
+
+  private Mono<MessageTo> get(
+      ChatRoom chatroom,
+      String username,
+      Long messageId)
+  {
+    return
+        chatroom
             .getMessage(username, messageId)
             .map(message -> MessageTo.from(message));
   }
 
-  @GetMapping(
-      path = "listen/{chatroomId}",
-      produces = MediaType.TEXT_EVENT_STREAM_VALUE)
-  public Flux<MessageTo> listen(@PathVariable UUID chatroomId)
+  @GetMapping(path = "listen/{chatroomId}")
+  public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatroomId)
   {
     return chatHome
         .getChatroom(chatroomId)
+        .map(chatroom -> listen(chatroom))
+        .orElseThrow(() -> new UnknownChatroomException(chatroomId));
+  }
+
+  private Flux<ServerSentEvent<MessageTo>> listen(ChatRoom chatroom)
+  {
+    return chatroom
         .listen()
         .log()
-        .map(message -> MessageTo.from(message));
+        .map(message -> MessageTo.from(message))
+        .map(messageTo ->
+            ServerSentEvent
+                .builder(messageTo)
+                .id(messageTo.getSerial().toString())
+                .event("message")
+                .build());
+  }
+
+  @PostMapping("/store")
+  public void store()
+  {
+    storageStrategy.writeChatrooms(Flux.fromStream(chatHome.list()));
   }
 }