refactor: Moved `FilesStorageStrategy` in its own package -- Move
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / storage / files / FilesStorageStrategy.java
1 package de.juplo.kafka.chat.backend.persistence.filestorage;
2
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatRoomTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.ChatRoom;
9 import de.juplo.kafka.chat.backend.domain.Message;
10 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
11 import lombok.RequiredArgsConstructor;
12 import lombok.extern.slf4j.Slf4j;
13 import reactor.core.publisher.Flux;
14
15 import java.io.IOException;
16 import java.nio.file.Files;
17 import java.nio.file.Path;
18 import java.time.Clock;
19
20 import static java.nio.file.StandardOpenOption.CREATE;
21 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
22
23
24 @RequiredArgsConstructor
25 @Slf4j
26 public class FileStorageStrategy implements StorageStrategy
27 {
28   public static final String CHATROOMS_FILENAME = "chatrooms.json";
29
30
31   private final Path storagePath;
32   private final Clock clock;
33   private final int bufferSize;
34   private final ChatRoomServiceFactory factory;
35   private final ObjectMapper mapper;
36
37
38   @Override
39   public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
40   {
41     Path path = chatroomsPath();
42     log.info("Writing chatrooms to {}", path);
43     try
44     {
45       Files.createDirectories(storagePath);
46
47       JsonGenerator generator =
48           mapper
49               .getFactory()
50               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
51
52       chatroomFlux
53           .log()
54           .doFirst(() ->
55           {
56             try
57             {
58               generator.useDefaultPrettyPrinter();
59               generator.writeStartArray();
60             }
61             catch (IOException e)
62             {
63               throw new RuntimeException(e);
64             }
65           })
66           .doOnTerminate(() ->
67           {
68             try
69             {
70               generator.writeEndArray();
71               generator.close();
72             }
73             catch (IOException e)
74             {
75               throw new RuntimeException(e);
76             }
77           })
78           .subscribe(chatroom ->
79           {
80             try
81             {
82               ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
83               generator.writeObject(chatroomTo);
84               writeMessages(chatroomTo, chatroom.getMessages());
85             }
86             catch (IOException e)
87             {
88               throw new RuntimeException(e);
89             }
90           });
91     }
92     catch (IOException e)
93     {
94       throw new RuntimeException(e);
95     }
96   }
97
98   @Override
99   public Flux<ChatRoom> readChatrooms()
100   {
101     JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
102     return Flux
103         .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
104         .log()
105         .map(chatRoomTo -> new ChatRoom(
106             chatRoomTo.getId(),
107             chatRoomTo.getName(),
108             clock,
109             factory.create(readMessages(chatRoomTo)),
110             bufferSize));
111   }
112
113   @Override
114   public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
115   {
116     Path path = chatroomPath(chatroomTo);
117     log.info("Writing messages for {} to {}", chatroomTo, path);
118     try
119     {
120       Files.createDirectories(storagePath);
121
122       JsonGenerator generator =
123           mapper
124               .getFactory()
125               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
126
127       messageFlux
128           .log()
129           .doFirst(() ->
130           {
131             try
132             {
133               generator.useDefaultPrettyPrinter();
134               generator.writeStartArray();
135             }
136             catch (IOException e)
137             {
138               throw new RuntimeException(e);
139             }
140           })
141           .doOnTerminate(() ->
142           {
143             try
144             {
145               generator.writeEndArray();
146               generator.close();
147             }
148             catch (IOException e)
149             {
150               throw new RuntimeException(e);
151             }
152           })
153           .subscribe(message ->
154           {
155             try
156             {
157               MessageTo messageTo = MessageTo.from(message);
158               generator.writeObject(messageTo);
159             }
160             catch (IOException e)
161             {
162               throw new RuntimeException(e);
163             }
164           });
165     }
166     catch (IOException e)
167     {
168       throw new RuntimeException(e);
169     }
170   }
171
172   @Override
173   public Flux<Message> readMessages(ChatRoomTo chatroomTo)
174   {
175     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
176     return Flux
177         .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
178         .log()
179         .map(MessageTo::toMessage);
180   }
181
182   Path chatroomsPath()
183   {
184     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
185   }
186
187   Path chatroomPath(ChatRoomTo chatroomTo)
188   {
189     return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
190   }
191 }