feat: Added an endpoint that list all messages of a chatroom
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / api / ChatBackendController.java
1 package de.juplo.kafka.chat.backend.api;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.Chatroom;
5 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
6 import lombok.RequiredArgsConstructor;
7 import org.springframework.http.MediaType;
8 import org.springframework.http.codec.ServerSentEvent;
9 import org.springframework.web.bind.annotation.*;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
12
13 import java.time.Clock;
14 import java.time.LocalDateTime;
15 import java.util.Optional;
16 import java.util.UUID;
17 import java.util.stream.Stream;
18
19
20 @RestController
21 @RequiredArgsConstructor
22 public class ChatBackendController
23 {
24   private final ChatHome chatHome;
25   private final Clock clock;
26   private final StorageStrategy storageStrategy;
27
28
29   @PostMapping("create")
30   public ChatroomTo create(@RequestBody String name)
31   {
32     return ChatroomTo.from(chatHome.createChatroom(name));
33   }
34
35   @GetMapping("list")
36   public Stream<ChatroomTo> list()
37   {
38     return chatHome.list().map(chatroom -> ChatroomTo.from(chatroom));
39   }
40
41   @GetMapping("list/{chatroomId}")
42   public Flux<MessageTo> list(@PathVariable UUID chatroomId)
43   {
44     return chatHome
45         .getChatroom(chatroomId)
46         .map(chatroom -> chatroom
47             .getMessages()
48             .map(MessageTo::from))
49         .get();
50   }
51
52   @GetMapping("get/{chatroomId}")
53   public Optional<ChatroomTo> get(@PathVariable UUID chatroomId)
54   {
55     return chatHome.getChatroom(chatroomId).map(chatroom -> ChatroomTo.from(chatroom));
56   }
57
58   @PutMapping("put/{chatroomId}/{username}/{messageId}")
59   public Mono<MessageTo> put(
60       @PathVariable UUID chatroomId,
61       @PathVariable String username,
62       @PathVariable Long messageId,
63       @RequestBody String text)
64   {
65     return
66         chatHome
67             .getChatroom(chatroomId)
68             .map(chatroom -> put(chatroom, username, messageId, text))
69             .orElseThrow(() -> new UnknownChatroomException(chatroomId));
70   }
71
72   public Mono<MessageTo> put(
73       Chatroom chatroom,
74       String username,
75       Long messageId,
76       String text)
77   {
78     return
79         chatroom
80             .addMessage(
81                 messageId,
82                 LocalDateTime.now(clock),
83                 username,
84                 text)
85             .switchIfEmpty(chatroom.getMessage(username, messageId))
86             .map(message -> MessageTo.from(message));
87   }
88
89   @GetMapping("get/{chatroomId}/{username}/{messageId}")
90   public Mono<MessageTo> get(
91       @PathVariable UUID chatroomId,
92       @PathVariable String username,
93       @PathVariable Long messageId)
94   {
95     return
96         chatHome
97             .getChatroom(chatroomId)
98             .map(chatroom -> get(chatroom, username, messageId))
99             .orElseThrow(() -> new UnknownChatroomException(chatroomId));
100   }
101
102   private Mono<MessageTo> get(
103       Chatroom chatroom,
104       String username,
105       Long messageId)
106   {
107     return
108         chatroom
109             .getMessage(username, messageId)
110             .map(message -> MessageTo.from(message));
111   }
112
113   @GetMapping(path = "listen/{chatroomId}")
114   public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatroomId)
115   {
116     return chatHome
117         .getChatroom(chatroomId)
118         .map(chatroom -> listen(chatroom))
119         .orElseThrow(() -> new UnknownChatroomException(chatroomId));
120   }
121
122   private Flux<ServerSentEvent<MessageTo>> listen(Chatroom chatroom)
123   {
124     return chatroom
125         .listen()
126         .log()
127         .map(message -> MessageTo.from(message))
128         .map(messageTo ->
129             ServerSentEvent
130                 .builder(messageTo)
131                 .id(messageTo.getSerial().toString())
132                 .event("message")
133                 .build());
134   }
135
136   @PostMapping("/store")
137   public void store()
138   {
139     storageStrategy.writeChatrooms(Flux.fromStream(chatHome.list()));
140   }
141 }