refactor: The `ChatRoom` determines the timestamp of a `Message`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / api / ChatBackendController.java
1 package de.juplo.kafka.chat.backend.api;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
6 import lombok.RequiredArgsConstructor;
7 import org.springframework.http.codec.ServerSentEvent;
8 import org.springframework.web.bind.annotation.*;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
11
12 import java.util.Optional;
13 import java.util.UUID;
14 import java.util.stream.Stream;
15
16
17 @RestController
18 @RequiredArgsConstructor
19 public class ChatBackendController
20 {
21   private final ChatHome chatHome;
22   private final StorageStrategy storageStrategy;
23
24
25   @PostMapping("create")
26   public ChatRoomTo create(@RequestBody String name)
27   {
28     return ChatRoomTo.from(chatHome.createChatroom(name));
29   }
30
31   @GetMapping("list")
32   public Stream<ChatRoomTo> list()
33   {
34     return chatHome.list().map(chatroom -> ChatRoomTo.from(chatroom));
35   }
36
37   @GetMapping("list/{chatroomId}")
38   public Flux<MessageTo> list(@PathVariable UUID chatroomId)
39   {
40     return chatHome
41         .getChatroom(chatroomId)
42         .map(chatroom -> chatroom
43             .getMessages()
44             .map(MessageTo::from))
45         .get();
46   }
47
48   @GetMapping("get/{chatroomId}")
49   public Optional<ChatRoomTo> get(@PathVariable UUID chatroomId)
50   {
51     return chatHome.getChatroom(chatroomId).map(chatroom -> ChatRoomTo.from(chatroom));
52   }
53
54   @PutMapping("put/{chatroomId}/{username}/{messageId}")
55   public Mono<MessageTo> put(
56       @PathVariable UUID chatroomId,
57       @PathVariable String username,
58       @PathVariable Long messageId,
59       @RequestBody String text)
60   {
61     return
62         chatHome
63             .getChatroom(chatroomId)
64             .map(chatroom -> put(chatroom, username, messageId, text))
65             .orElseThrow(() -> new UnknownChatroomException(chatroomId));
66   }
67
68   public Mono<MessageTo> put(
69       ChatRoom chatroom,
70       String username,
71       Long messageId,
72       String text)
73   {
74     return
75         chatroom
76             .addMessage(
77                 messageId,
78                 username,
79                 text)
80             .switchIfEmpty(chatroom.getMessage(username, messageId))
81             .map(message -> MessageTo.from(message));
82   }
83
84   @GetMapping("get/{chatroomId}/{username}/{messageId}")
85   public Mono<MessageTo> get(
86       @PathVariable UUID chatroomId,
87       @PathVariable String username,
88       @PathVariable Long messageId)
89   {
90     return
91         chatHome
92             .getChatroom(chatroomId)
93             .map(chatroom -> get(chatroom, username, messageId))
94             .orElseThrow(() -> new UnknownChatroomException(chatroomId));
95   }
96
97   private Mono<MessageTo> get(
98       ChatRoom chatroom,
99       String username,
100       Long messageId)
101   {
102     return
103         chatroom
104             .getMessage(username, messageId)
105             .map(message -> MessageTo.from(message));
106   }
107
108   @GetMapping(path = "listen/{chatroomId}")
109   public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatroomId)
110   {
111     return chatHome
112         .getChatroom(chatroomId)
113         .map(chatroom -> listen(chatroom))
114         .orElseThrow(() -> new UnknownChatroomException(chatroomId));
115   }
116
117   private Flux<ServerSentEvent<MessageTo>> listen(ChatRoom chatroom)
118   {
119     return chatroom
120         .listen()
121         .log()
122         .map(message -> MessageTo.from(message))
123         .map(messageTo ->
124             ServerSentEvent
125                 .builder(messageTo)
126                 .id(messageTo.getSerial().toString())
127                 .event("message")
128                 .build());
129   }
130
131   @PostMapping("/store")
132   public void store()
133   {
134     storageStrategy.writeChatrooms(Flux.fromStream(chatHome.list()));
135   }
136 }