refactor: DRY für shard-selection
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / api / ChatBackendController.java
1 package de.juplo.kafka.chat.backend.api;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
6 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
7 import lombok.RequiredArgsConstructor;
8 import org.springframework.http.codec.ServerSentEvent;
9 import org.springframework.web.bind.annotation.*;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
12
13 import java.util.UUID;
14
15
16 @RestController
17 @RequiredArgsConstructor
18 public class ChatBackendController
19 {
20   private final ChatHome[] chatHomes;
21   private final ShardingStrategy selectionStrategy;
22   private final ChatRoomFactory factory;
23   private final StorageStrategy storageStrategy;
24
25
26   @PostMapping("create")
27   public Mono<ChatRoomTo> create(@RequestBody String name)
28   {
29     UUID chatRoomId = UUID.randomUUID();
30     return factory
31         .createChatRoom(chatRoomId, name)
32         .flatMap(chatRoom -> chatHomes[chatRoom.getShard()].putChatRoom(chatRoom))
33         .map(ChatRoomTo::from);
34   }
35
36   @GetMapping("list")
37   public Flux<ChatRoomTo> list()
38   {
39     return Flux
40         .fromArray(chatHomes)
41         .flatMap(chatHome -> chatHome.getChatRooms())
42         .map(chatroom -> ChatRoomTo.from(chatroom));
43   }
44
45   @GetMapping("{chatroomId}/list")
46   public Flux<MessageTo> list(@PathVariable UUID chatroomId)
47   {
48     return chatHomes[selectShard(chatroomId)]
49         .getChatRoom(chatroomId)
50         .flatMapMany(chatroom -> chatroom
51             .getMessages()
52             .map(MessageTo::from));
53   }
54
55   @GetMapping("{chatroomId}")
56   public Mono<ChatRoomTo> get(@PathVariable UUID chatroomId)
57   {
58     return chatHomes[selectShard(chatroomId)]
59         .getChatRoom(chatroomId)
60         .map(chatroom -> ChatRoomTo.from(chatroom));
61   }
62
63   @PutMapping("{chatroomId}/{username}/{messageId}")
64   public Mono<MessageTo> put(
65       @PathVariable UUID chatroomId,
66       @PathVariable String username,
67       @PathVariable Long messageId,
68       @RequestBody String text)
69   {
70     return
71         chatHomes[selectShard(chatroomId)]
72             .getChatRoom(chatroomId)
73             .flatMap(chatroom -> put(chatroom, username, messageId, text));
74   }
75
76   public Mono<MessageTo> put(
77       ChatRoom chatroom,
78       String username,
79       Long messageId,
80       String text)
81   {
82     return
83         chatroom
84             .addMessage(
85                 messageId,
86                 username,
87                 text)
88             .map(message -> MessageTo.from(message));
89   }
90
91   @GetMapping("{chatroomId}/{username}/{messageId}")
92   public Mono<MessageTo> get(
93       @PathVariable UUID chatroomId,
94       @PathVariable String username,
95       @PathVariable Long messageId)
96   {
97     return
98         chatHomes[selectShard(chatroomId)]
99             .getChatRoom(chatroomId)
100             .flatMap(chatroom -> get(chatroom, username, messageId));
101   }
102
103   private Mono<MessageTo> get(
104       ChatRoom chatroom,
105       String username,
106       Long messageId)
107   {
108     return
109         chatroom
110             .getMessage(username, messageId)
111             .map(message -> MessageTo.from(message));
112   }
113
114   @GetMapping(path = "{chatroomId}/listen")
115   public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatroomId)
116   {
117     return chatHomes[selectShard(chatroomId)]
118         .getChatRoom(chatroomId)
119         .flatMapMany(chatroom -> listen(chatroom));
120   }
121
122   private Flux<ServerSentEvent<MessageTo>> listen(ChatRoom chatroom)
123   {
124     return chatroom
125         .listen()
126         .log()
127         .map(message -> MessageTo.from(message))
128         .map(messageTo ->
129             ServerSentEvent
130                 .builder(messageTo)
131                 .id(messageTo.getSerial().toString())
132                 .event("message")
133                 .build());
134   }
135
136   @PostMapping("/store")
137   public void store()
138   {
139     for (int shard = 0; shard < chatHomes.length; shard++)
140       storageStrategy.write(chatHomes[shard].getChatRooms());
141   }
142
143   private int selectShard(UUID chatroomId)
144   {
145     return selectionStrategy.selectShard(chatroomId);
146   }
147 }