feat: Added a POST-mapping to force the storing of the data
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / api / ChatBackendController.java
1 package de.juplo.kafka.chat.backend.api;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.Chatroom;
5 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
6 import lombok.RequiredArgsConstructor;
7 import org.springframework.http.MediaType;
8 import org.springframework.web.bind.annotation.*;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
11
12 import java.time.Clock;
13 import java.time.LocalDateTime;
14 import java.util.Optional;
15 import java.util.UUID;
16 import java.util.stream.Stream;
17
18
19 @RestController
20 @RequiredArgsConstructor
21 public class ChatBackendController
22 {
23   private final ChatHome chatHome;
24   private final Clock clock;
25   private final StorageStrategy storageStrategy;
26
27
28   @PostMapping("create")
29   public ChatroomTo create(@RequestBody String name)
30   {
31     return ChatroomTo.from(chatHome.createChatroom(name));
32   }
33
34   @GetMapping("list")
35   public Stream<ChatroomTo> list()
36   {
37     return chatHome.list().map(chatroom -> ChatroomTo.from(chatroom));
38   }
39
40   @GetMapping("get/{chatroomId}")
41   public Optional<ChatroomTo> get(@PathVariable UUID chatroomId)
42   {
43     return chatHome.getChatroom(chatroomId).map(chatroom -> ChatroomTo.from(chatroom));
44   }
45
46   @PutMapping("put/{chatroomId}/{username}/{messageId}")
47   public Mono<MessageTo> put(
48       @PathVariable UUID chatroomId,
49       @PathVariable String username,
50       @PathVariable Long messageId,
51       @RequestBody String text)
52   {
53     return
54         chatHome
55             .getChatroom(chatroomId)
56             .map(chatroom -> put(chatroom, username, messageId, text))
57             .orElseThrow(() -> new UnknownChatroomException(chatroomId));
58   }
59
60   public Mono<MessageTo> put(
61       Chatroom chatroom,
62       String username,
63       Long messageId,
64       String text)
65   {
66     return
67         chatroom
68             .addMessage(
69                 messageId,
70                 LocalDateTime.now(clock),
71                 username,
72                 text)
73             .switchIfEmpty(chatroom.getMessage(username, messageId))
74             .map(message -> MessageTo.from(message));
75   }
76
77   @GetMapping("get/{chatroomId}/{username}/{messageId}")
78   public Mono<MessageTo> get(
79       @PathVariable UUID chatroomId,
80       @PathVariable String username,
81       @PathVariable Long messageId)
82   {
83     return
84         chatHome
85             .getChatroom(chatroomId)
86             .map(chatroom -> get(chatroom, username, messageId))
87             .orElseThrow(() -> new UnknownChatroomException(chatroomId));
88   }
89
90   private Mono<MessageTo> get(
91       Chatroom chatroom,
92       String username,
93       Long messageId)
94   {
95     return
96         chatroom
97             .getMessage(username, messageId)
98             .map(message -> MessageTo.from(message));
99   }
100
101   @GetMapping(
102       path = "listen/{chatroomId}",
103       produces = MediaType.TEXT_EVENT_STREAM_VALUE)
104   public Flux<MessageTo> listen(@PathVariable UUID chatroomId)
105   {
106     return chatHome
107         .getChatroom(chatroomId)
108         .map(chatroom -> listen(chatroom))
109         .orElseThrow(() -> new UnknownChatroomException(chatroomId));
110   }
111
112   private Flux<MessageTo> listen(Chatroom chatroom)
113   {
114     return chatroom
115         .listen()
116         .log()
117         .map(message -> MessageTo.from(message));
118   }
119
120   @PostMapping("/store")
121   public void store()
122   {
123     storageStrategy.writeChatrooms(Flux.fromStream(chatHome.list()));
124   }
125 }