feat: Server sends real Server-Sent-Events
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / api / ChatBackendController.java
1 package de.juplo.kafka.chat.backend.api;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.Chatroom;
5 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
6 import lombok.RequiredArgsConstructor;
7 import org.springframework.http.MediaType;
8 import org.springframework.http.codec.ServerSentEvent;
9 import org.springframework.web.bind.annotation.*;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
12
13 import java.time.Clock;
14 import java.time.LocalDateTime;
15 import java.util.Optional;
16 import java.util.UUID;
17 import java.util.stream.Stream;
18
19
20 @RestController
21 @RequiredArgsConstructor
22 public class ChatBackendController
23 {
24   private final ChatHome chatHome;
25   private final Clock clock;
26   private final StorageStrategy storageStrategy;
27
28
29   @PostMapping("create")
30   public ChatroomTo create(@RequestBody String name)
31   {
32     return ChatroomTo.from(chatHome.createChatroom(name));
33   }
34
35   @GetMapping("list")
36   public Stream<ChatroomTo> list()
37   {
38     return chatHome.list().map(chatroom -> ChatroomTo.from(chatroom));
39   }
40
41   @GetMapping("get/{chatroomId}")
42   public Optional<ChatroomTo> get(@PathVariable UUID chatroomId)
43   {
44     return chatHome.getChatroom(chatroomId).map(chatroom -> ChatroomTo.from(chatroom));
45   }
46
47   @PutMapping("put/{chatroomId}/{username}/{messageId}")
48   public Mono<MessageTo> put(
49       @PathVariable UUID chatroomId,
50       @PathVariable String username,
51       @PathVariable Long messageId,
52       @RequestBody String text)
53   {
54     return
55         chatHome
56             .getChatroom(chatroomId)
57             .map(chatroom -> put(chatroom, username, messageId, text))
58             .orElseThrow(() -> new UnknownChatroomException(chatroomId));
59   }
60
61   public Mono<MessageTo> put(
62       Chatroom chatroom,
63       String username,
64       Long messageId,
65       String text)
66   {
67     return
68         chatroom
69             .addMessage(
70                 messageId,
71                 LocalDateTime.now(clock),
72                 username,
73                 text)
74             .switchIfEmpty(chatroom.getMessage(username, messageId))
75             .map(message -> MessageTo.from(message));
76   }
77
78   @GetMapping("get/{chatroomId}/{username}/{messageId}")
79   public Mono<MessageTo> get(
80       @PathVariable UUID chatroomId,
81       @PathVariable String username,
82       @PathVariable Long messageId)
83   {
84     return
85         chatHome
86             .getChatroom(chatroomId)
87             .map(chatroom -> get(chatroom, username, messageId))
88             .orElseThrow(() -> new UnknownChatroomException(chatroomId));
89   }
90
91   private Mono<MessageTo> get(
92       Chatroom chatroom,
93       String username,
94       Long messageId)
95   {
96     return
97         chatroom
98             .getMessage(username, messageId)
99             .map(message -> MessageTo.from(message));
100   }
101
102   @GetMapping(path = "listen/{chatroomId}")
103   public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatroomId)
104   {
105     return chatHome
106         .getChatroom(chatroomId)
107         .map(chatroom -> listen(chatroom))
108         .orElseThrow(() -> new UnknownChatroomException(chatroomId));
109   }
110
111   private Flux<ServerSentEvent<MessageTo>> listen(Chatroom chatroom)
112   {
113     return chatroom
114         .listen()
115         .log()
116         .map(message -> MessageTo.from(message))
117         .map(messageTo ->
118             ServerSentEvent
119                 .builder(messageTo)
120                 .id(messageTo.getSerial().toString())
121                 .event("message")
122                 .build());
123   }
124
125   @PostMapping("/store")
126   public void store()
127   {
128     storageStrategy.writeChatrooms(Flux.fromStream(chatHome.list()));
129   }
130 }