1 package de.juplo.kafka.chat.backend.api;
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
6 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
7 import lombok.RequiredArgsConstructor;
8 import org.springframework.http.codec.ServerSentEvent;
9 import org.springframework.web.bind.annotation.*;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
13 import java.util.UUID;
17 @RequiredArgsConstructor
18 public class ChatBackendController
20 private final ChatHome[] chatHomes;
21 private final ShardingStrategy selectionStrategy;
22 private final ChatRoomFactory factory;
23 private final StorageStrategy storageStrategy;
26 @PostMapping("create")
27 public Mono<ChatRoomTo> create(@RequestBody String name)
29 UUID chatRoomId = UUID.randomUUID();
31 .createChatRoom(chatRoomId, name)
32 .flatMap(chatRoom -> chatHomes[chatRoom.getShard()].putChatRoom(chatRoom))
33 .map(ChatRoomTo::from);
37 public Flux<ChatRoomTo> list()
41 .flatMap(chatHome -> chatHome.getChatRooms())
42 .map(chatroom -> ChatRoomTo.from(chatroom));
45 @GetMapping("{chatroomId}/list")
46 public Flux<MessageTo> list(@PathVariable UUID chatroomId)
48 int shard = selectionStrategy.selectShard(chatroomId);
49 return chatHomes[shard]
50 .getChatRoom(chatroomId)
51 .flatMapMany(chatroom -> chatroom
53 .map(MessageTo::from));
56 @GetMapping("{chatroomId}")
57 public Mono<ChatRoomTo> get(@PathVariable UUID chatroomId)
59 int shard = selectionStrategy.selectShard(chatroomId);
60 return chatHomes[shard]
61 .getChatRoom(chatroomId)
62 .map(chatroom -> ChatRoomTo.from(chatroom));
65 @PutMapping("{chatroomId}/{username}/{messageId}")
66 public Mono<MessageTo> put(
67 @PathVariable UUID chatroomId,
68 @PathVariable String username,
69 @PathVariable Long messageId,
70 @RequestBody String text)
72 int shard = selectionStrategy.selectShard(chatroomId);
75 .getChatRoom(chatroomId)
76 .flatMap(chatroom -> put(chatroom, username, messageId, text));
79 public Mono<MessageTo> put(
91 .map(message -> MessageTo.from(message));
94 @GetMapping("{chatroomId}/{username}/{messageId}")
95 public Mono<MessageTo> get(
96 @PathVariable UUID chatroomId,
97 @PathVariable String username,
98 @PathVariable Long messageId)
100 int shard = selectionStrategy.selectShard(chatroomId);
103 .getChatRoom(chatroomId)
104 .flatMap(chatroom -> get(chatroom, username, messageId));
107 private Mono<MessageTo> get(
114 .getMessage(username, messageId)
115 .map(message -> MessageTo.from(message));
118 @GetMapping(path = "{chatroomId}/listen")
119 public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatroomId)
121 int shard = selectionStrategy.selectShard(chatroomId);
122 return chatHomes[shard]
123 .getChatRoom(chatroomId)
124 .flatMapMany(chatroom -> listen(chatroom));
127 private Flux<ServerSentEvent<MessageTo>> listen(ChatRoom chatroom)
132 .map(message -> MessageTo.from(message))
136 .id(messageTo.getSerial().toString())
141 @PostMapping("/store")
144 for (int shard = 0; shard < chatHomes.length; shard++)
145 storageStrategy.write(chatHomes[shard].getChatRooms());