5d3c067aff5ea4f1f2c85db83fa71a880c8c8290
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / storage / files / FilesStorageStrategy.java
1 package de.juplo.kafka.chat.backend.persistence.storage.files;
2
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatRoomTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.ChatRoom;
9 import de.juplo.kafka.chat.backend.domain.Message;
10 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
11 import lombok.RequiredArgsConstructor;
12 import lombok.extern.slf4j.Slf4j;
13 import reactor.core.publisher.Flux;
14
15 import java.io.IOException;
16 import java.nio.file.Files;
17 import java.nio.file.Path;
18 import java.time.Clock;
19
20 import static java.nio.file.StandardOpenOption.CREATE;
21 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
22
23
24 @RequiredArgsConstructor
25 @Slf4j
26 public class FilesStorageStrategy implements StorageStrategy
27 {
28   public static final String CHATROOMS_FILENAME = "chatrooms.json";
29
30
31   private final Path storagePath;
32   private final Clock clock;
33   private final int bufferSize;
34   private final ChatRoomServiceFactory factory;
35   private final ObjectMapper mapper;
36
37
38   @Override
39   public void write(Flux<ChatRoom> chatroomFlux)
40   {
41     Path path = chatroomsPath();
42     log.info("Writing chatrooms to {}", path);
43     try
44     {
45       Files.createDirectories(storagePath);
46
47       JsonGenerator generator =
48           mapper
49               .getFactory()
50               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
51
52       chatroomFlux
53           .log()
54           .doFirst(() ->
55           {
56             try
57             {
58               generator.useDefaultPrettyPrinter();
59               generator.writeStartArray();
60             }
61             catch (IOException e)
62             {
63               throw new RuntimeException(e);
64             }
65           })
66           .doOnTerminate(() ->
67           {
68             try
69             {
70               generator.writeEndArray();
71               generator.close();
72             }
73             catch (IOException e)
74             {
75               throw new RuntimeException(e);
76             }
77           })
78           .subscribe(chatroom ->
79           {
80             try
81             {
82               ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
83               generator.writeObject(chatroomTo);
84               writeMessages(chatroomTo, chatroom.getMessages());
85             }
86             catch (IOException e)
87             {
88               throw new RuntimeException(e);
89             }
90           });
91     }
92     catch (IOException e)
93     {
94       throw new RuntimeException(e);
95     }
96   }
97
98   @Override
99   public Flux<ChatRoom> read()
100   {
101     JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
102     return Flux
103         .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
104         .log()
105         .map(chatRoomTo -> new ChatRoom(
106               chatRoomTo.getId(),
107               chatRoomTo.getName(),
108               clock,
109               factory.create(readMessages(chatRoomTo)),
110               bufferSize));
111   }
112
113   public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
114   {
115     Path path = chatroomPath(chatroomTo);
116     log.info("Writing messages for {} to {}", chatroomTo, path);
117     try
118     {
119       Files.createDirectories(storagePath);
120
121       JsonGenerator generator =
122           mapper
123               .getFactory()
124               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
125
126       messageFlux
127           .log()
128           .doFirst(() ->
129           {
130             try
131             {
132               generator.useDefaultPrettyPrinter();
133               generator.writeStartArray();
134             }
135             catch (IOException e)
136             {
137               throw new RuntimeException(e);
138             }
139           })
140           .doOnTerminate(() ->
141           {
142             try
143             {
144               generator.writeEndArray();
145               generator.close();
146             }
147             catch (IOException e)
148             {
149               throw new RuntimeException(e);
150             }
151           })
152           .subscribe(message ->
153           {
154             try
155             {
156               MessageTo messageTo = MessageTo.from(message);
157               generator.writeObject(messageTo);
158             }
159             catch (IOException e)
160             {
161               throw new RuntimeException(e);
162             }
163           });
164     }
165     catch (IOException e)
166     {
167       throw new RuntimeException(e);
168     }
169   }
170
171   public Flux<Message> readMessages(ChatRoomTo chatroomTo)
172   {
173     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
174     return Flux
175         .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
176         .log()
177         .map(MessageTo::toMessage);
178   }
179
180   Path chatroomsPath()
181   {
182     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
183   }
184
185   Path chatroomPath(ChatRoomTo chatroomTo)
186   {
187     return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
188   }
189 }