refactor: Only `LocalJsonFileStorageStrategy` restores `Chatroom`s
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / LocalJsonFilesStorageStrategy.java
1 package de.juplo.kafka.chat.backend.persistence;
2
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatroomTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.Chatroom;
9 import de.juplo.kafka.chat.backend.domain.Message;
10 import lombok.RequiredArgsConstructor;
11 import lombok.extern.slf4j.Slf4j;
12 import reactor.core.publisher.Flux;
13
14 import java.io.IOException;
15 import java.nio.file.Files;
16 import java.nio.file.Path;
17
18 import static java.nio.file.StandardOpenOption.CREATE;
19 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
20
21
22 @RequiredArgsConstructor
23 @Slf4j
24 public class LocalJsonFilesStorageStrategy implements StorageStrategy
25 {
26   public static final String CHATROOMS_FILENAME = "chatrooms.json";
27
28
29   private final Path storagePath;
30   private final ObjectMapper mapper;
31   private final InMemoryChatroomFactory chatroomFactory;
32
33
34   @Override
35   public void writeChatrooms(Flux<Chatroom> chatroomFlux)
36   {
37     Path path = chatroomsPath();
38     log.info("Writing chatrooms to {}", path);
39     try
40     {
41       Files.createDirectories(storagePath);
42
43       JsonGenerator generator =
44           mapper
45               .getFactory()
46               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
47
48       chatroomFlux
49           .log()
50           .doFirst(() ->
51           {
52             try
53             {
54               generator.useDefaultPrettyPrinter();
55               generator.writeStartArray();
56             }
57             catch (IOException e)
58             {
59               throw new RuntimeException(e);
60             }
61           })
62           .doOnTerminate(() ->
63           {
64             try
65             {
66               generator.writeEndArray();
67               generator.close();
68             }
69             catch (IOException e)
70             {
71               throw new RuntimeException(e);
72             }
73           })
74           .subscribe(chatroom ->
75           {
76             try
77             {
78               ChatroomTo chatroomTo = ChatroomTo.from(chatroom);
79               generator.writeObject(chatroomTo);
80               writeMessages(chatroomTo, chatroom.getMessages());
81             }
82             catch (IOException e)
83             {
84               throw new RuntimeException(e);
85             }
86           });
87     }
88     catch (IOException e)
89     {
90       throw new RuntimeException(e);
91     }
92   }
93
94   @Override
95   public Flux<Chatroom> readChatrooms()
96   {
97     JavaType type = mapper.getTypeFactory().constructType(ChatroomTo.class);
98     return Flux
99         .from(new JsonFilePublisher<ChatroomTo>(chatroomsPath(), mapper, type))
100         .log()
101         .map(chatroomTo ->
102         {
103           InMemoryPersistenceStrategy strategy =
104               new InMemoryPersistenceStrategy(readMessages(chatroomTo));
105           return chatroomFactory.restoreChatroom(chatroomTo.getId(), chatroomTo.getName(), strategy);
106         });
107   }
108
109   @Override
110   public void writeMessages(ChatroomTo chatroomTo, Flux<Message> messageFlux)
111   {
112     Path path = chatroomPath(chatroomTo);
113     log.info("Writing messages for {} to {}", chatroomTo, path);
114     try
115     {
116       Files.createDirectories(storagePath);
117
118       JsonGenerator generator =
119           mapper
120               .getFactory()
121               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
122
123       messageFlux
124           .log()
125           .doFirst(() ->
126           {
127             try
128             {
129               generator.useDefaultPrettyPrinter();
130               generator.writeStartArray();
131             }
132             catch (IOException e)
133             {
134               throw new RuntimeException(e);
135             }
136           })
137           .doOnTerminate(() ->
138           {
139             try
140             {
141               generator.writeEndArray();
142               generator.close();
143             }
144             catch (IOException e)
145             {
146               throw new RuntimeException(e);
147             }
148           })
149           .subscribe(message ->
150           {
151             try
152             {
153               MessageTo messageTo = MessageTo.from(message);
154               generator.writeObject(messageTo);
155             }
156             catch (IOException e)
157             {
158               throw new RuntimeException(e);
159             }
160           });
161     }
162     catch (IOException e)
163     {
164       throw new RuntimeException(e);
165     }
166   }
167
168   @Override
169   public Flux<Message> readMessages(ChatroomTo chatroomTo)
170   {
171     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
172     return Flux
173         .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
174         .log()
175         .map(MessageTo::toMessage);
176   }
177
178   Path chatroomsPath()
179   {
180     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
181   }
182
183   Path chatroomPath(ChatroomTo chatroomTo)
184   {
185     return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
186   }
187 }