refactor: Moved classes in package `persistence` in sub-packages -- Rename
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / filestorage / FileStorageStrategy.java
1 package de.juplo.kafka.chat.backend.persistence.filestorage;
2
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatRoomTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.ChatRoom;
9 import de.juplo.kafka.chat.backend.domain.Message;
10 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
11 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
12 import lombok.RequiredArgsConstructor;
13 import lombok.extern.slf4j.Slf4j;
14 import reactor.core.publisher.Flux;
15
16 import java.io.IOException;
17 import java.nio.file.Files;
18 import java.nio.file.Path;
19 import java.time.Clock;
20
21 import static java.nio.file.StandardOpenOption.CREATE;
22 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
23
24
25 @RequiredArgsConstructor
26 @Slf4j
27 public class FileStorageStrategy implements StorageStrategy
28 {
29   public static final String CHATROOMS_FILENAME = "chatrooms.json";
30
31
32   private final Path storagePath;
33   private final Clock clock;
34   private final int bufferSize;
35   private final ObjectMapper mapper;
36
37
38   @Override
39   public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
40   {
41     Path path = chatroomsPath();
42     log.info("Writing chatrooms to {}", path);
43     try
44     {
45       Files.createDirectories(storagePath);
46
47       JsonGenerator generator =
48           mapper
49               .getFactory()
50               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
51
52       chatroomFlux
53           .log()
54           .doFirst(() ->
55           {
56             try
57             {
58               generator.useDefaultPrettyPrinter();
59               generator.writeStartArray();
60             }
61             catch (IOException e)
62             {
63               throw new RuntimeException(e);
64             }
65           })
66           .doOnTerminate(() ->
67           {
68             try
69             {
70               generator.writeEndArray();
71               generator.close();
72             }
73             catch (IOException e)
74             {
75               throw new RuntimeException(e);
76             }
77           })
78           .subscribe(chatroom ->
79           {
80             try
81             {
82               ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
83               generator.writeObject(chatroomTo);
84               writeMessages(chatroomTo, chatroom.getMessages());
85             }
86             catch (IOException e)
87             {
88               throw new RuntimeException(e);
89             }
90           });
91     }
92     catch (IOException e)
93     {
94       throw new RuntimeException(e);
95     }
96   }
97
98   @Override
99   public Flux<ChatRoom> readChatrooms()
100   {
101     JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
102     return Flux
103         .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
104         .log()
105         .map(chatRoomTo ->
106         {
107           InMemoryChatRoomService chatroomService =
108               new InMemoryChatRoomService(readMessages(chatRoomTo));
109           return new ChatRoom(
110               chatRoomTo.getId(),
111               chatRoomTo.getName(),
112               clock,
113               chatroomService,
114               bufferSize);
115         });
116   }
117
118   @Override
119   public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
120   {
121     Path path = chatroomPath(chatroomTo);
122     log.info("Writing messages for {} to {}", chatroomTo, path);
123     try
124     {
125       Files.createDirectories(storagePath);
126
127       JsonGenerator generator =
128           mapper
129               .getFactory()
130               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
131
132       messageFlux
133           .log()
134           .doFirst(() ->
135           {
136             try
137             {
138               generator.useDefaultPrettyPrinter();
139               generator.writeStartArray();
140             }
141             catch (IOException e)
142             {
143               throw new RuntimeException(e);
144             }
145           })
146           .doOnTerminate(() ->
147           {
148             try
149             {
150               generator.writeEndArray();
151               generator.close();
152             }
153             catch (IOException e)
154             {
155               throw new RuntimeException(e);
156             }
157           })
158           .subscribe(message ->
159           {
160             try
161             {
162               MessageTo messageTo = MessageTo.from(message);
163               generator.writeObject(messageTo);
164             }
165             catch (IOException e)
166             {
167               throw new RuntimeException(e);
168             }
169           });
170     }
171     catch (IOException e)
172     {
173       throw new RuntimeException(e);
174     }
175   }
176
177   @Override
178   public Flux<Message> readMessages(ChatRoomTo chatroomTo)
179   {
180     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
181     return Flux
182         .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
183         .log()
184         .map(MessageTo::toMessage);
185   }
186
187   Path chatroomsPath()
188   {
189     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
190   }
191
192   Path chatroomPath(ChatRoomTo chatroomTo)
193   {
194     return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
195   }
196 }