40fa2bc4191edfbad18dd49d974c43915a9db50b
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / LocalJsonFilesStorageStrategy.java
1 package de.juplo.kafka.chat.backend.persistence;
2
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.core.JsonParser;
5 import com.fasterxml.jackson.core.JsonToken;
6 import com.fasterxml.jackson.databind.ObjectMapper;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.Chatroom;
9 import de.juplo.kafka.chat.backend.domain.ChatroomFactory;
10 import de.juplo.kafka.chat.backend.domain.Message;
11 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
12 import lombok.RequiredArgsConstructor;
13 import lombok.extern.slf4j.Slf4j;
14 import reactor.core.publisher.Flux;
15 import reactor.core.publisher.Sinks;
16
17 import java.io.IOException;
18 import java.nio.file.Files;
19 import java.nio.file.NoSuchFileException;
20 import java.nio.file.Path;
21 import java.util.LinkedHashMap;
22 import java.util.function.Function;
23 import java.util.stream.Collector;
24 import java.util.stream.Collectors;
25
26 import static java.nio.file.StandardOpenOption.CREATE;
27 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
28
29
30 @RequiredArgsConstructor
31 @Slf4j
32 public class LocalJsonFilesStorageStrategy implements StorageStrategy
33 {
34   public static final String CHATROOMS_FILENAME = "chatrooms.json";
35
36
37   private final Path storagePath;
38   private final ObjectMapper mapper;
39   private final ChatroomFactory chatroomFactory;
40
41
42   @Override
43   public void writeChatrooms(Flux<Chatroom> chatroomFlux)
44   {
45     Path path = chatroomsPath();
46     log.info("Writing chatrooms to {}", path);
47     try
48     {
49       Files.createDirectories(storagePath);
50
51       JsonGenerator generator =
52           mapper
53               .getFactory()
54               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
55
56       chatroomFlux
57           .log()
58           .doFirst(() ->
59           {
60             try
61             {
62               generator.useDefaultPrettyPrinter();
63               generator.writeStartArray();
64             }
65             catch (IOException e)
66             {
67               throw new RuntimeException(e);
68             }
69           })
70           .doOnTerminate(() ->
71           {
72             try
73             {
74               generator.writeEndArray();
75               generator.close();
76             }
77             catch (IOException e)
78             {
79               throw new RuntimeException(e);
80             }
81           })
82           .subscribe(chatroom ->
83           {
84             try
85             {
86               ChatroomInfo chatroomInfo = ChatroomInfo.from(chatroom);
87               generator.writeObject(chatroomInfo);
88               writeMessages(chatroomInfo, chatroom.getMessages());
89             }
90             catch (IOException e)
91             {
92               throw new RuntimeException(e);
93             }
94           });
95     }
96     catch (IOException e)
97     {
98       throw new RuntimeException(e);
99     }
100   }
101
102   @Override
103   public Flux<Chatroom> readChatrooms()
104   {
105     Path path = chatroomsPath();
106     log.info("Reading chatrooms from {}", path);
107     try
108     {
109       JsonParser parser =
110           mapper
111               .getFactory()
112               .createParser(Files.newBufferedReader(path));
113
114       if (parser.nextToken() != JsonToken.START_ARRAY)
115         throw new IllegalStateException("Expected content to be an array");
116
117       Sinks.Many<ChatroomInfo> many = Sinks.many().unicast().onBackpressureBuffer();
118
119       while (parser.nextToken() != JsonToken.END_ARRAY)
120       {
121         many
122             .tryEmitNext(mapper.readValue(parser, ChatroomInfo.class))
123             .orThrow();
124       }
125
126       many.tryEmitComplete().orThrow();
127
128       return many
129           .asFlux()
130           .map(chatroomInfo ->
131           {
132             LinkedHashMap<Message.MessageKey, Message> messages =
133                 readMessages(chatroomInfo)
134                     .collect(Collectors.toMap(
135                         Message::getKey,
136                         Function.identity(),
137                         (existing, message) ->
138                         {
139                           if (!message.equals(existing))
140                             throw new MessageMutationException(message, existing);
141                           return existing;
142                         },
143                         LinkedHashMap::new))
144                     .block();
145             InMemoryPersistenceStrategy strategy = new InMemoryPersistenceStrategy(messages);
146             return chatroomFactory.restoreChatroom(chatroomInfo.getId(), chatroomInfo.getName(), strategy);
147           });
148     }
149     catch (NoSuchFileException e)
150     {
151       log.info("{} does not exist - starting with empty ChatHome", path);
152       return Flux.empty();
153     }
154     catch (IOException e)
155     {
156       throw new RuntimeException(e);
157     }
158   }
159
160   @Override
161   public void writeMessages(ChatroomInfo chatroomInfo, Flux<Message> messageFlux)
162   {
163     Path path = chatroomPath(chatroomInfo);
164     log.info("Writing messages for {} to {}", chatroomInfo, path);
165     try
166     {
167       Files.createDirectories(storagePath);
168
169       JsonGenerator generator =
170           mapper
171               .getFactory()
172               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
173
174       messageFlux
175           .log()
176           .doFirst(() ->
177           {
178             try
179             {
180               generator.useDefaultPrettyPrinter();
181               generator.writeStartArray();
182             }
183             catch (IOException e)
184             {
185               throw new RuntimeException(e);
186             }
187           })
188           .doOnTerminate(() ->
189           {
190             try
191             {
192               generator.writeEndArray();
193               generator.close();
194             }
195             catch (IOException e)
196             {
197               throw new RuntimeException(e);
198             }
199           })
200           .subscribe(message ->
201           {
202             try
203             {
204               MessageTo messageTo = MessageTo.from(message);
205               generator.writeObject(messageTo);
206             }
207             catch (IOException e)
208             {
209               throw new RuntimeException(e);
210             }
211           });
212     }
213     catch (IOException e)
214     {
215       throw new RuntimeException(e);
216     }
217   }
218
219   @Override
220   public Flux<Message> readMessages(ChatroomInfo chatroomInfo)
221   {
222     Path path = chatroomPath(chatroomInfo);
223     log.info("Reading messages for {} from {}", chatroomInfo, path);
224     try
225     {
226       JsonParser parser =
227           mapper
228               .getFactory()
229               .createParser(Files.newBufferedReader(path));
230
231       if (parser.nextToken() != JsonToken.START_ARRAY)
232         throw new IllegalStateException("Expected content to be an array");
233
234       Sinks.Many<Message> many = Sinks.many().unicast().onBackpressureBuffer();
235
236       while (parser.nextToken() != JsonToken.END_ARRAY)
237       {
238         many
239             .tryEmitNext(mapper.readValue(parser, MessageTo.class).toMessage())
240             .orThrow();
241       }
242
243       many.tryEmitComplete().orThrow();
244
245       return many.asFlux();
246     }
247     catch (NoSuchFileException e)
248     {
249       log.info(
250           "{} does not exist - starting with empty chat for {}",
251           path,
252           chatroomInfo);
253       return Flux.empty();
254     }
255     catch (IOException e)
256     {
257       throw new RuntimeException(e);
258     }
259   }
260
261   Path chatroomsPath()
262   {
263     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
264   }
265
266   Path chatroomPath(ChatroomInfo chatroomInfo)
267   {
268     return storagePath.resolve(Path.of(chatroomInfo.getId().toString() + ".json"));
269   }
270 }