refactore: Renamed `PersistenceStrategy` to `ChatroomService` -- Rename
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / LocalJsonFilesStorageStrategy.java
1 package de.juplo.kafka.chat.backend.persistence;
2
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatroomTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.Chatroom;
9 import de.juplo.kafka.chat.backend.domain.Message;
10 import lombok.RequiredArgsConstructor;
11 import lombok.extern.slf4j.Slf4j;
12 import reactor.core.publisher.Flux;
13
14 import java.io.IOException;
15 import java.nio.file.Files;
16 import java.nio.file.Path;
17
18 import static java.nio.file.StandardOpenOption.CREATE;
19 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
20
21
22 @RequiredArgsConstructor
23 @Slf4j
24 public class LocalJsonFilesStorageStrategy implements StorageStrategy
25 {
26   public static final String CHATROOMS_FILENAME = "chatrooms.json";
27
28
29   private final Path storagePath;
30   private final ObjectMapper mapper;
31   private final InMemoryChatroomFactory chatroomFactory;
32
33
34   @Override
35   public void writeChatrooms(Flux<Chatroom> chatroomFlux)
36   {
37     Path path = chatroomsPath();
38     log.info("Writing chatrooms to {}", path);
39     try
40     {
41       Files.createDirectories(storagePath);
42
43       JsonGenerator generator =
44           mapper
45               .getFactory()
46               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
47
48       chatroomFlux
49           .log()
50           .doFirst(() ->
51           {
52             try
53             {
54               generator.useDefaultPrettyPrinter();
55               generator.writeStartArray();
56             }
57             catch (IOException e)
58             {
59               throw new RuntimeException(e);
60             }
61           })
62           .doOnTerminate(() ->
63           {
64             try
65             {
66               generator.writeEndArray();
67               generator.close();
68             }
69             catch (IOException e)
70             {
71               throw new RuntimeException(e);
72             }
73           })
74           .subscribe(chatroom ->
75           {
76             try
77             {
78               ChatroomTo chatroomTo = ChatroomTo.from(chatroom);
79               generator.writeObject(chatroomTo);
80               writeMessages(chatroomTo, chatroom.getMessages());
81             }
82             catch (IOException e)
83             {
84               throw new RuntimeException(e);
85             }
86           });
87     }
88     catch (IOException e)
89     {
90       throw new RuntimeException(e);
91     }
92   }
93
94   @Override
95   public Flux<Chatroom> readChatrooms()
96   {
97     JavaType type = mapper.getTypeFactory().constructType(ChatroomTo.class);
98     return Flux
99         .from(new JsonFilePublisher<ChatroomTo>(chatroomsPath(), mapper, type))
100         .log()
101         .map(chatroomTo ->
102         {
103           InMemoryChatroomService chatroomService =
104               new InMemoryChatroomService(readMessages(chatroomTo));
105           return chatroomFactory.restoreChatroom(
106               chatroomTo.getId(),
107               chatroomTo.getName(),
108               chatroomService);
109         });
110   }
111
112   @Override
113   public void writeMessages(ChatroomTo chatroomTo, Flux<Message> messageFlux)
114   {
115     Path path = chatroomPath(chatroomTo);
116     log.info("Writing messages for {} to {}", chatroomTo, path);
117     try
118     {
119       Files.createDirectories(storagePath);
120
121       JsonGenerator generator =
122           mapper
123               .getFactory()
124               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
125
126       messageFlux
127           .log()
128           .doFirst(() ->
129           {
130             try
131             {
132               generator.useDefaultPrettyPrinter();
133               generator.writeStartArray();
134             }
135             catch (IOException e)
136             {
137               throw new RuntimeException(e);
138             }
139           })
140           .doOnTerminate(() ->
141           {
142             try
143             {
144               generator.writeEndArray();
145               generator.close();
146             }
147             catch (IOException e)
148             {
149               throw new RuntimeException(e);
150             }
151           })
152           .subscribe(message ->
153           {
154             try
155             {
156               MessageTo messageTo = MessageTo.from(message);
157               generator.writeObject(messageTo);
158             }
159             catch (IOException e)
160             {
161               throw new RuntimeException(e);
162             }
163           });
164     }
165     catch (IOException e)
166     {
167       throw new RuntimeException(e);
168     }
169   }
170
171   @Override
172   public Flux<Message> readMessages(ChatroomTo chatroomTo)
173   {
174     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
175     return Flux
176         .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
177         .log()
178         .map(MessageTo::toMessage);
179   }
180
181   Path chatroomsPath()
182   {
183     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
184   }
185
186   Path chatroomPath(ChatroomTo chatroomTo)
187   {
188     return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
189   }
190 }