feat: Moved persistence-logic from `ChatHome` into `ChatHomeService`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / LocalJsonFilesStorageStrategy.java
1 package de.juplo.kafka.chat.backend.persistence;
2
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatRoomTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.ChatRoom;
9 import de.juplo.kafka.chat.backend.domain.Message;
10 import lombok.RequiredArgsConstructor;
11 import lombok.extern.slf4j.Slf4j;
12 import reactor.core.publisher.Flux;
13
14 import java.io.IOException;
15 import java.nio.file.Files;
16 import java.nio.file.Path;
17 import java.time.Clock;
18
19 import static java.nio.file.StandardOpenOption.CREATE;
20 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
21
22
23 @RequiredArgsConstructor
24 @Slf4j
25 public class LocalJsonFilesStorageStrategy implements StorageStrategy
26 {
27   public static final String CHATROOMS_FILENAME = "chatrooms.json";
28
29
30   private final Path storagePath;
31   private final Clock clock;
32   private final int bufferSize;
33   private final ObjectMapper mapper;
34
35
36   @Override
37   public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
38   {
39     Path path = chatroomsPath();
40     log.info("Writing chatrooms to {}", path);
41     try
42     {
43       Files.createDirectories(storagePath);
44
45       JsonGenerator generator =
46           mapper
47               .getFactory()
48               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
49
50       chatroomFlux
51           .log()
52           .doFirst(() ->
53           {
54             try
55             {
56               generator.useDefaultPrettyPrinter();
57               generator.writeStartArray();
58             }
59             catch (IOException e)
60             {
61               throw new RuntimeException(e);
62             }
63           })
64           .doOnTerminate(() ->
65           {
66             try
67             {
68               generator.writeEndArray();
69               generator.close();
70             }
71             catch (IOException e)
72             {
73               throw new RuntimeException(e);
74             }
75           })
76           .subscribe(chatroom ->
77           {
78             try
79             {
80               ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
81               generator.writeObject(chatroomTo);
82               writeMessages(chatroomTo, chatroom.getMessages());
83             }
84             catch (IOException e)
85             {
86               throw new RuntimeException(e);
87             }
88           });
89     }
90     catch (IOException e)
91     {
92       throw new RuntimeException(e);
93     }
94   }
95
96   @Override
97   public Flux<ChatRoom> readChatrooms()
98   {
99     JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
100     return Flux
101         .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
102         .log()
103         .map(chatRoomTo ->
104         {
105           InMemoryChatRoomService chatroomService =
106               new InMemoryChatRoomService(readMessages(chatRoomTo));
107           return new ChatRoom(
108               chatRoomTo.getId(),
109               chatRoomTo.getName(),
110               clock,
111               chatroomService,
112               bufferSize);
113         });
114   }
115
116   @Override
117   public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
118   {
119     Path path = chatroomPath(chatroomTo);
120     log.info("Writing messages for {} to {}", chatroomTo, path);
121     try
122     {
123       Files.createDirectories(storagePath);
124
125       JsonGenerator generator =
126           mapper
127               .getFactory()
128               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
129
130       messageFlux
131           .log()
132           .doFirst(() ->
133           {
134             try
135             {
136               generator.useDefaultPrettyPrinter();
137               generator.writeStartArray();
138             }
139             catch (IOException e)
140             {
141               throw new RuntimeException(e);
142             }
143           })
144           .doOnTerminate(() ->
145           {
146             try
147             {
148               generator.writeEndArray();
149               generator.close();
150             }
151             catch (IOException e)
152             {
153               throw new RuntimeException(e);
154             }
155           })
156           .subscribe(message ->
157           {
158             try
159             {
160               MessageTo messageTo = MessageTo.from(message);
161               generator.writeObject(messageTo);
162             }
163             catch (IOException e)
164             {
165               throw new RuntimeException(e);
166             }
167           });
168     }
169     catch (IOException e)
170     {
171       throw new RuntimeException(e);
172     }
173   }
174
175   @Override
176   public Flux<Message> readMessages(ChatRoomTo chatroomTo)
177   {
178     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
179     return Flux
180         .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
181         .log()
182         .map(MessageTo::toMessage);
183   }
184
185   Path chatroomsPath()
186   {
187     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
188   }
189
190   Path chatroomPath(ChatRoomTo chatroomTo)
191   {
192     return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
193   }
194 }