1 package de.juplo.kafka.chat.backend.api;
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
6 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
7 import lombok.RequiredArgsConstructor;
8 import org.springframework.http.codec.ServerSentEvent;
9 import org.springframework.web.bind.annotation.*;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
13 import java.util.UUID;
17 @RequiredArgsConstructor
18 public class ChatBackendController
20 private final ChatHome[] chatHomes;
21 private final ShardingStrategy selectionStrategy;
22 private final ChatRoomFactory factory;
23 private final StorageStrategy storageStrategy;
26 @PostMapping("create")
27 public Mono<ChatRoomTo> create(@RequestBody String name)
29 UUID chatRoomId = UUID.randomUUID();
31 .createChatRoom(chatRoomId, name)
32 .flatMap(chatRoom -> chatHomes[chatRoom.getShard()].putChatRoom(chatRoom))
33 .map(ChatRoomTo::from);
37 public Flux<ChatRoomTo> list()
41 .flatMap(chatHome -> chatHome.getChatRooms())
42 .map(chatroom -> ChatRoomTo.from(chatroom));
45 @GetMapping("{chatroomId}/list")
46 public Flux<MessageTo> list(@PathVariable UUID chatroomId)
48 return chatHomes[selectShard(chatroomId)]
49 .getChatRoom(chatroomId)
50 .flatMapMany(chatroom -> chatroom
52 .map(MessageTo::from));
55 @GetMapping("{chatroomId}")
56 public Mono<ChatRoomTo> get(@PathVariable UUID chatroomId)
58 return chatHomes[selectShard(chatroomId)]
59 .getChatRoom(chatroomId)
60 .map(chatroom -> ChatRoomTo.from(chatroom));
63 @PutMapping("{chatroomId}/{username}/{messageId}")
64 public Mono<MessageTo> put(
65 @PathVariable UUID chatroomId,
66 @PathVariable String username,
67 @PathVariable Long messageId,
68 @RequestBody String text)
71 chatHomes[selectShard(chatroomId)]
72 .getChatRoom(chatroomId)
73 .flatMap(chatroom -> put(chatroom, username, messageId, text));
76 public Mono<MessageTo> put(
88 .map(message -> MessageTo.from(message));
91 @GetMapping("{chatroomId}/{username}/{messageId}")
92 public Mono<MessageTo> get(
93 @PathVariable UUID chatroomId,
94 @PathVariable String username,
95 @PathVariable Long messageId)
98 chatHomes[selectShard(chatroomId)]
99 .getChatRoom(chatroomId)
100 .flatMap(chatroom -> get(chatroom, username, messageId));
103 private Mono<MessageTo> get(
110 .getMessage(username, messageId)
111 .map(message -> MessageTo.from(message));
114 @GetMapping(path = "{chatroomId}/listen")
115 public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatroomId)
117 return chatHomes[selectShard(chatroomId)]
118 .getChatRoom(chatroomId)
119 .flatMapMany(chatroom -> listen(chatroom));
122 private Flux<ServerSentEvent<MessageTo>> listen(ChatRoom chatroom)
127 .map(message -> MessageTo.from(message))
131 .id(messageTo.getSerial().toString())
136 @PostMapping("/store")
139 for (int shard = 0; shard < chatHomes.length; shard++)
140 storageStrategy.write(chatHomes[shard].getChatRooms());
143 private int selectShard(UUID chatroomId)
145 return selectionStrategy.selectShard(chatroomId);