31a2035c78df0d249eb8eab33fdfe9430151dbc6
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / api / ChatBackendController.java
1 package de.juplo.kafka.chat.backend.api;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
6 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
7 import lombok.RequiredArgsConstructor;
8 import org.springframework.http.codec.ServerSentEvent;
9 import org.springframework.web.bind.annotation.*;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
12
13 import java.util.Optional;
14 import java.util.UUID;
15 import java.util.stream.Stream;
16
17
18 @RestController
19 @RequiredArgsConstructor
20 public class ChatBackendController
21 {
22   private final ChatHome chatHome;
23   private final StorageStrategy storageStrategy;
24
25
26   @PostMapping("create")
27   public ChatRoomTo create(@RequestBody String name)
28   {
29     return ChatRoomTo.from(chatHome.createChatroom(name));
30   }
31
32   @GetMapping("list")
33   public Stream<ChatRoomTo> list()
34   {
35     return chatHome.list().map(chatroom -> ChatRoomTo.from(chatroom));
36   }
37
38   @GetMapping("list/{chatroomId}")
39   public Flux<MessageTo> list(@PathVariable UUID chatroomId)
40   {
41     return chatHome
42         .getChatroom(chatroomId)
43         .map(chatroom -> chatroom
44             .getMessages()
45             .map(MessageTo::from))
46         .get();
47   }
48
49   @GetMapping("get/{chatroomId}")
50   public Optional<ChatRoomTo> get(@PathVariable UUID chatroomId)
51   {
52     return chatHome.getChatroom(chatroomId).map(chatroom -> ChatRoomTo.from(chatroom));
53   }
54
55   @PutMapping("put/{chatroomId}/{username}/{messageId}")
56   public Mono<MessageTo> put(
57       @PathVariable UUID chatroomId,
58       @PathVariable String username,
59       @PathVariable Long messageId,
60       @RequestBody String text)
61   {
62     return
63         chatHome
64             .getChatroom(chatroomId)
65             .map(chatroom -> put(chatroom, username, messageId, text))
66             .orElseThrow(() -> new UnknownChatroomException(chatroomId));
67   }
68
69   public Mono<MessageTo> put(
70       ChatRoom chatroom,
71       String username,
72       Long messageId,
73       String text)
74   {
75     return
76         chatroom
77             .addMessage(
78                 messageId,
79                 username,
80                 text)
81             .switchIfEmpty(chatroom.getMessage(username, messageId))
82             .map(message -> MessageTo.from(message));
83   }
84
85   @GetMapping("get/{chatroomId}/{username}/{messageId}")
86   public Mono<MessageTo> get(
87       @PathVariable UUID chatroomId,
88       @PathVariable String username,
89       @PathVariable Long messageId)
90   {
91     return
92         chatHome
93             .getChatroom(chatroomId)
94             .map(chatroom -> get(chatroom, username, messageId))
95             .orElseThrow(() -> new UnknownChatroomException(chatroomId));
96   }
97
98   private Mono<MessageTo> get(
99       ChatRoom chatroom,
100       String username,
101       Long messageId)
102   {
103     return
104         chatroom
105             .getMessage(username, messageId)
106             .map(message -> MessageTo.from(message));
107   }
108
109   @GetMapping(path = "listen/{chatroomId}")
110   public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatroomId)
111   {
112     return chatHome
113         .getChatroom(chatroomId)
114         .map(chatroom -> listen(chatroom))
115         .orElseThrow(() -> new UnknownChatroomException(chatroomId));
116   }
117
118   private Flux<ServerSentEvent<MessageTo>> listen(ChatRoom chatroom)
119   {
120     return chatroom
121         .listen()
122         .log()
123         .map(message -> MessageTo.from(message))
124         .map(messageTo ->
125             ServerSentEvent
126                 .builder(messageTo)
127                 .id(messageTo.getSerial().toString())
128                 .event("message")
129                 .build());
130   }
131
132   @PostMapping("/store")
133   public void store()
134   {
135     storageStrategy.writeChatrooms(Flux.fromStream(chatHome.list()));
136   }
137 }