8fccaa9bd88e95b041c859b6c858bddd8400b084
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / api / ChatBackendController.java
1 package de.juplo.kafka.chat.backend.api;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.Chatroom;
5 import lombok.RequiredArgsConstructor;
6 import org.springframework.http.MediaType;
7 import org.springframework.web.bind.annotation.*;
8 import reactor.core.publisher.Flux;
9 import reactor.core.publisher.Mono;
10
11 import java.time.Clock;
12 import java.time.LocalDateTime;
13 import java.util.Collection;
14 import java.util.Optional;
15 import java.util.UUID;
16 import java.util.stream.Stream;
17
18
19 @RestController
20 @RequiredArgsConstructor
21 public class ChatBackendController
22 {
23   private final ChatHome chatHome;
24   private final Clock clock;
25
26
27   @PostMapping("create")
28   public ChatroomTo create(@RequestBody String name)
29   {
30     return ChatroomTo.from(chatHome.createChatroom(name));
31   }
32
33   @GetMapping("list")
34   public Stream<ChatroomTo> list()
35   {
36     return chatHome.list().map(chatroom -> ChatroomTo.from(chatroom));
37   }
38
39   @GetMapping("get/{chatroomId}")
40   public Optional<ChatroomTo> get(@PathVariable UUID chatroomId)
41   {
42     return chatHome.getChatroom(chatroomId).map(chatroom -> ChatroomTo.from(chatroom));
43   }
44
45   @PutMapping("put/{chatroomId}/{username}/{messageId}")
46   public Mono<MessageTo> put(
47       @PathVariable UUID chatroomId,
48       @PathVariable String username,
49       @PathVariable Long messageId,
50       @RequestBody String text)
51   {
52     return
53         chatHome
54             .getChatroom(chatroomId)
55             .map(chatroom -> put(chatroom, username, messageId, text))
56             .orElseThrow(() -> new UnknownChatroomException(chatroomId));
57   }
58
59   public Mono<MessageTo> put(
60       Chatroom chatroom,
61       String username,
62       Long messageId,
63       String text)
64   {
65     return
66         chatroom
67             .addMessage(
68                 messageId,
69                 LocalDateTime.now(clock),
70                 username,
71                 text)
72             .switchIfEmpty(chatroom.getMessage(username, messageId))
73             .map(message -> MessageTo.from(message));
74   }
75
76   @GetMapping("get/{chatroomId}/{username}/{messageId}")
77   public Mono<MessageTo> get(
78       @PathVariable UUID chatroomId,
79       @PathVariable String username,
80       @PathVariable Long messageId)
81   {
82     return
83         chatHome
84             .getChatroom(chatroomId)
85             .map(chatroom -> get(chatroom, username, messageId))
86             .orElseThrow(() -> new UnknownChatroomException(chatroomId));
87   }
88
89   private Mono<MessageTo> get(
90       Chatroom chatroom,
91       String username,
92       Long messageId)
93   {
94     return
95         chatroom
96             .getMessage(username, messageId)
97             .map(message -> MessageTo.from(message));
98   }
99
100   @GetMapping(
101       path = "listen/{chatroomId}",
102       produces = MediaType.TEXT_EVENT_STREAM_VALUE)
103   public Flux<MessageTo> listen(@PathVariable UUID chatroomId)
104   {
105     return chatHome
106         .getChatroom(chatroomId)
107         .map(chatroom -> listen(chatroom))
108         .orElseThrow(() -> new UnknownChatroomException(chatroomId));
109   }
110
111   private Flux<MessageTo> listen(Chatroom chatroom)
112   {
113     return chatroom
114         .listen()
115         .log()
116         .map(message -> MessageTo.from(message));
117   }
118 }