feat: Prepared the application for sharding
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / api / ChatBackendController.java
1 package de.juplo.kafka.chat.backend.api;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
6 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
7 import lombok.RequiredArgsConstructor;
8 import org.springframework.http.codec.ServerSentEvent;
9 import org.springframework.web.bind.annotation.*;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
12
13 import java.util.UUID;
14
15
16 @RestController
17 @RequiredArgsConstructor
18 public class ChatBackendController
19 {
20   private final ChatHome[] chatHomes;
21   private final ShardingStrategy selectionStrategy;
22   private final ChatRoomFactory factory;
23   private final StorageStrategy storageStrategy;
24
25
26   @PostMapping("create")
27   public Mono<ChatRoomTo> create(@RequestBody String name)
28   {
29     UUID chatRoomId = UUID.randomUUID();
30     return factory
31         .createChatRoom(chatRoomId, name)
32         .flatMap(chatRoom -> chatHomes[chatRoom.getShard()].putChatRoom(chatRoom))
33         .map(ChatRoomTo::from);
34   }
35
36   @GetMapping("list")
37   public Flux<ChatRoomTo> list()
38   {
39     return Flux
40         .fromArray(chatHomes)
41         .flatMap(chatHome -> chatHome.getChatRooms())
42         .map(chatroom -> ChatRoomTo.from(chatroom));
43   }
44
45   @GetMapping("{chatroomId}/list")
46   public Flux<MessageTo> list(@PathVariable UUID chatroomId)
47   {
48     int shard = selectionStrategy.selectShard(chatroomId);
49     return chatHomes[shard]
50         .getChatRoom(chatroomId)
51         .flatMapMany(chatroom -> chatroom
52             .getMessages()
53             .map(MessageTo::from));
54   }
55
56   @GetMapping("{chatroomId}")
57   public Mono<ChatRoomTo> get(@PathVariable UUID chatroomId)
58   {
59     int shard = selectionStrategy.selectShard(chatroomId);
60     return chatHomes[shard]
61         .getChatRoom(chatroomId)
62         .map(chatroom -> ChatRoomTo.from(chatroom));
63   }
64
65   @PutMapping("{chatroomId}/{username}/{messageId}")
66   public Mono<MessageTo> put(
67       @PathVariable UUID chatroomId,
68       @PathVariable String username,
69       @PathVariable Long messageId,
70       @RequestBody String text)
71   {
72     int shard = selectionStrategy.selectShard(chatroomId);
73     return
74         chatHomes[shard]
75             .getChatRoom(chatroomId)
76             .flatMap(chatroom -> put(chatroom, username, messageId, text));
77   }
78
79   public Mono<MessageTo> put(
80       ChatRoom chatroom,
81       String username,
82       Long messageId,
83       String text)
84   {
85     return
86         chatroom
87             .addMessage(
88                 messageId,
89                 username,
90                 text)
91             .map(message -> MessageTo.from(message));
92   }
93
94   @GetMapping("{chatroomId}/{username}/{messageId}")
95   public Mono<MessageTo> get(
96       @PathVariable UUID chatroomId,
97       @PathVariable String username,
98       @PathVariable Long messageId)
99   {
100     int shard = selectionStrategy.selectShard(chatroomId);
101     return
102         chatHomes[shard]
103             .getChatRoom(chatroomId)
104             .flatMap(chatroom -> get(chatroom, username, messageId));
105   }
106
107   private Mono<MessageTo> get(
108       ChatRoom chatroom,
109       String username,
110       Long messageId)
111   {
112     return
113         chatroom
114             .getMessage(username, messageId)
115             .map(message -> MessageTo.from(message));
116   }
117
118   @GetMapping(path = "{chatroomId}/listen")
119   public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatroomId)
120   {
121     int shard = selectionStrategy.selectShard(chatroomId);
122     return chatHomes[shard]
123         .getChatRoom(chatroomId)
124         .flatMapMany(chatroom -> listen(chatroom));
125   }
126
127   private Flux<ServerSentEvent<MessageTo>> listen(ChatRoom chatroom)
128   {
129     return chatroom
130         .listen()
131         .log()
132         .map(message -> MessageTo.from(message))
133         .map(messageTo ->
134             ServerSentEvent
135                 .builder(messageTo)
136                 .id(messageTo.getSerial().toString())
137                 .event("message")
138                 .build());
139   }
140
141   @PostMapping("/store")
142   public void store()
143   {
144     for (int shard = 0; shard < chatHomes.length; shard++)
145       storageStrategy.write(chatHomes[shard].getChatRooms());
146   }
147 }