refactor: `LocalJsonFilesStorageStrategy` is now realy reactive
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / LocalJsonFilesStorageStrategy.java
1 package de.juplo.kafka.chat.backend.persistence;
2
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatroomTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.Chatroom;
9 import de.juplo.kafka.chat.backend.domain.ChatroomFactory;
10 import de.juplo.kafka.chat.backend.domain.Message;
11 import lombok.RequiredArgsConstructor;
12 import lombok.extern.slf4j.Slf4j;
13 import reactor.core.publisher.Flux;
14
15 import java.io.IOException;
16 import java.nio.file.Files;
17 import java.nio.file.Path;
18
19 import static java.nio.file.StandardOpenOption.CREATE;
20 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
21
22
23 @RequiredArgsConstructor
24 @Slf4j
25 public class LocalJsonFilesStorageStrategy implements StorageStrategy
26 {
27   public static final String CHATROOMS_FILENAME = "chatrooms.json";
28
29
30   private final Path storagePath;
31   private final ObjectMapper mapper;
32   private final ChatroomFactory chatroomFactory;
33
34
35   @Override
36   public void writeChatrooms(Flux<Chatroom> chatroomFlux)
37   {
38     Path path = chatroomsPath();
39     log.info("Writing chatrooms to {}", path);
40     try
41     {
42       Files.createDirectories(storagePath);
43
44       JsonGenerator generator =
45           mapper
46               .getFactory()
47               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
48
49       chatroomFlux
50           .log()
51           .doFirst(() ->
52           {
53             try
54             {
55               generator.useDefaultPrettyPrinter();
56               generator.writeStartArray();
57             }
58             catch (IOException e)
59             {
60               throw new RuntimeException(e);
61             }
62           })
63           .doOnTerminate(() ->
64           {
65             try
66             {
67               generator.writeEndArray();
68               generator.close();
69             }
70             catch (IOException e)
71             {
72               throw new RuntimeException(e);
73             }
74           })
75           .subscribe(chatroom ->
76           {
77             try
78             {
79               ChatroomTo chatroomTo = ChatroomTo.from(chatroom);
80               generator.writeObject(chatroomTo);
81               writeMessages(chatroomTo, chatroom.getMessages());
82             }
83             catch (IOException e)
84             {
85               throw new RuntimeException(e);
86             }
87           });
88     }
89     catch (IOException e)
90     {
91       throw new RuntimeException(e);
92     }
93   }
94
95   @Override
96   public Flux<Chatroom> readChatrooms()
97   {
98     JavaType type = mapper.getTypeFactory().constructType(ChatroomTo.class);
99     return Flux
100         .from(new JsonFilePublisher<ChatroomTo>(chatroomsPath(), mapper, type))
101         .log()
102         .map(chatroomTo ->
103         {
104           InMemoryPersistenceStrategy strategy =
105               new InMemoryPersistenceStrategy(readMessages(chatroomTo));
106           return chatroomFactory.restoreChatroom(chatroomTo.getId(), chatroomTo.getName(), strategy);
107         });
108   }
109
110   @Override
111   public void writeMessages(ChatroomTo chatroomTo, Flux<Message> messageFlux)
112   {
113     Path path = chatroomPath(chatroomTo);
114     log.info("Writing messages for {} to {}", chatroomTo, path);
115     try
116     {
117       Files.createDirectories(storagePath);
118
119       JsonGenerator generator =
120           mapper
121               .getFactory()
122               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
123
124       messageFlux
125           .log()
126           .doFirst(() ->
127           {
128             try
129             {
130               generator.useDefaultPrettyPrinter();
131               generator.writeStartArray();
132             }
133             catch (IOException e)
134             {
135               throw new RuntimeException(e);
136             }
137           })
138           .doOnTerminate(() ->
139           {
140             try
141             {
142               generator.writeEndArray();
143               generator.close();
144             }
145             catch (IOException e)
146             {
147               throw new RuntimeException(e);
148             }
149           })
150           .subscribe(message ->
151           {
152             try
153             {
154               MessageTo messageTo = MessageTo.from(message);
155               generator.writeObject(messageTo);
156             }
157             catch (IOException e)
158             {
159               throw new RuntimeException(e);
160             }
161           });
162     }
163     catch (IOException e)
164     {
165       throw new RuntimeException(e);
166     }
167   }
168
169   @Override
170   public Flux<Message> readMessages(ChatroomTo chatroomTo)
171   {
172     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
173     return Flux
174         .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
175         .log()
176         .map(MessageTo::toMessage);
177   }
178
179   Path chatroomsPath()
180   {
181     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
182   }
183
184   Path chatroomPath(ChatroomTo chatroomTo)
185   {
186     return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
187   }
188 }