feat: Introduced a kafka-like `ShardingStrategy` for `inmemory`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / storage / files / FilesStorageStrategy.java
1 package de.juplo.kafka.chat.backend.persistence.storage.files;
2
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatRoomTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.api.ShardingStrategy;
9 import de.juplo.kafka.chat.backend.domain.ChatRoom;
10 import de.juplo.kafka.chat.backend.domain.Message;
11 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
12 import lombok.RequiredArgsConstructor;
13 import lombok.extern.slf4j.Slf4j;
14 import reactor.core.publisher.Flux;
15
16 import java.io.IOException;
17 import java.nio.file.Files;
18 import java.nio.file.Path;
19 import java.time.Clock;
20 import java.util.UUID;
21
22 import static java.nio.file.StandardOpenOption.CREATE;
23 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
24
25
26 @RequiredArgsConstructor
27 @Slf4j
28 public class FilesStorageStrategy implements StorageStrategy
29 {
30   public static final String CHATROOMS_FILENAME = "chatrooms.json";
31
32
33   private final Path storagePath;
34   private final Clock clock;
35   private final int bufferSize;
36   private final ShardingStrategy shardingStrategy;
37   private final ChatRoomServiceFactory factory;
38   private final ObjectMapper mapper;
39
40
41   @Override
42   public void write(Flux<ChatRoom> chatroomFlux)
43   {
44     Path path = chatroomsPath();
45     log.info("Writing chatrooms to {}", path);
46     try
47     {
48       Files.createDirectories(storagePath);
49
50       JsonGenerator generator =
51           mapper
52               .getFactory()
53               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
54
55       chatroomFlux
56           .log()
57           .doFirst(() ->
58           {
59             try
60             {
61               generator.useDefaultPrettyPrinter();
62               generator.writeStartArray();
63             }
64             catch (IOException e)
65             {
66               throw new RuntimeException(e);
67             }
68           })
69           .doOnTerminate(() ->
70           {
71             try
72             {
73               generator.writeEndArray();
74               generator.close();
75             }
76             catch (IOException e)
77             {
78               throw new RuntimeException(e);
79             }
80           })
81           .subscribe(chatroom ->
82           {
83             try
84             {
85               ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
86               generator.writeObject(chatroomTo);
87               writeMessages(chatroomTo, chatroom.getMessages());
88             }
89             catch (IOException e)
90             {
91               throw new RuntimeException(e);
92             }
93           });
94     }
95     catch (IOException e)
96     {
97       throw new RuntimeException(e);
98     }
99   }
100
101   @Override
102   public Flux<ChatRoom> read()
103   {
104     JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
105     return Flux
106         .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
107         .log()
108         .map(chatRoomTo ->
109         {
110           UUID chatRoomId = chatRoomTo.getId();
111           int shard = shardingStrategy.selectShard(chatRoomId);
112           return new ChatRoom(
113               chatRoomTo.getId(),
114               chatRoomTo.getName(),
115               shard,
116               clock,
117               factory.create(readMessages(chatRoomTo)),
118               bufferSize);
119         });
120   }
121
122   public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
123   {
124     Path path = chatroomPath(chatroomTo);
125     log.info("Writing messages for {} to {}", chatroomTo, path);
126     try
127     {
128       Files.createDirectories(storagePath);
129
130       JsonGenerator generator =
131           mapper
132               .getFactory()
133               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
134
135       messageFlux
136           .log()
137           .doFirst(() ->
138           {
139             try
140             {
141               generator.useDefaultPrettyPrinter();
142               generator.writeStartArray();
143             }
144             catch (IOException e)
145             {
146               throw new RuntimeException(e);
147             }
148           })
149           .doOnTerminate(() ->
150           {
151             try
152             {
153               generator.writeEndArray();
154               generator.close();
155             }
156             catch (IOException e)
157             {
158               throw new RuntimeException(e);
159             }
160           })
161           .subscribe(message ->
162           {
163             try
164             {
165               MessageTo messageTo = MessageTo.from(message);
166               generator.writeObject(messageTo);
167             }
168             catch (IOException e)
169             {
170               throw new RuntimeException(e);
171             }
172           });
173     }
174     catch (IOException e)
175     {
176       throw new RuntimeException(e);
177     }
178   }
179
180   public Flux<Message> readMessages(ChatRoomTo chatroomTo)
181   {
182     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
183     return Flux
184         .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
185         .log()
186         .map(MessageTo::toMessage);
187   }
188
189   Path chatroomsPath()
190   {
191     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
192   }
193
194   Path chatroomPath(ChatRoomTo chatroomTo)
195   {
196     return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
197   }
198 }