1 package de.juplo.kafka.chat.backend.api;
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.Chatroom;
5 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
6 import lombok.RequiredArgsConstructor;
7 import org.springframework.http.MediaType;
8 import org.springframework.web.bind.annotation.*;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
12 import java.time.Clock;
13 import java.time.LocalDateTime;
14 import java.util.Optional;
15 import java.util.UUID;
16 import java.util.stream.Stream;
20 @RequiredArgsConstructor
21 public class ChatBackendController
23 private final ChatHome chatHome;
24 private final Clock clock;
25 private final StorageStrategy storageStrategy;
28 @PostMapping("create")
29 public ChatroomTo create(@RequestBody String name)
31 return ChatroomTo.from(chatHome.createChatroom(name));
35 public Stream<ChatroomTo> list()
37 return chatHome.list().map(chatroom -> ChatroomTo.from(chatroom));
40 @GetMapping("get/{chatroomId}")
41 public Optional<ChatroomTo> get(@PathVariable UUID chatroomId)
43 return chatHome.getChatroom(chatroomId).map(chatroom -> ChatroomTo.from(chatroom));
46 @PutMapping("put/{chatroomId}/{username}/{messageId}")
47 public Mono<MessageTo> put(
48 @PathVariable UUID chatroomId,
49 @PathVariable String username,
50 @PathVariable Long messageId,
51 @RequestBody String text)
55 .getChatroom(chatroomId)
56 .map(chatroom -> put(chatroom, username, messageId, text))
57 .orElseThrow(() -> new UnknownChatroomException(chatroomId));
60 public Mono<MessageTo> put(
70 LocalDateTime.now(clock),
73 .switchIfEmpty(chatroom.getMessage(username, messageId))
74 .map(message -> MessageTo.from(message));
77 @GetMapping("get/{chatroomId}/{username}/{messageId}")
78 public Mono<MessageTo> get(
79 @PathVariable UUID chatroomId,
80 @PathVariable String username,
81 @PathVariable Long messageId)
85 .getChatroom(chatroomId)
86 .map(chatroom -> get(chatroom, username, messageId))
87 .orElseThrow(() -> new UnknownChatroomException(chatroomId));
90 private Mono<MessageTo> get(
97 .getMessage(username, messageId)
98 .map(message -> MessageTo.from(message));
102 path = "listen/{chatroomId}",
103 produces = MediaType.TEXT_EVENT_STREAM_VALUE)
104 public Flux<MessageTo> listen(@PathVariable UUID chatroomId)
107 .getChatroom(chatroomId)
108 .map(chatroom -> listen(chatroom))
109 .orElseThrow(() -> new UnknownChatroomException(chatroomId));
112 private Flux<MessageTo> listen(Chatroom chatroom)
117 .map(message -> MessageTo.from(message));
120 @PostMapping("/store")
123 storageStrategy.writeChatrooms(Flux.fromStream(chatHome.list()));