fd8939b64f06bd7a6c7e9480423bca299895d929
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / storage / files / FilesStorageStrategy.java
1 package de.juplo.kafka.chat.backend.storage.files;
2
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
9 import de.juplo.kafka.chat.backend.domain.Message;
10 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
11 import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
12 import lombok.RequiredArgsConstructor;
13 import lombok.extern.slf4j.Slf4j;
14 import reactor.core.publisher.Flux;
15
16 import java.io.IOException;
17 import java.nio.file.Files;
18 import java.nio.file.Path;
19 import java.util.UUID;
20 import java.util.logging.Level;
21
22 import static java.nio.file.StandardOpenOption.CREATE;
23 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
24
25
26 @RequiredArgsConstructor
27 @Slf4j
28 public class FilesStorageStrategy implements StorageStrategy
29 {
30   public static final String CHATROOMS_FILENAME = "chatrooms.json";
31
32
33   private final Path storagePath;
34   private final ShardingStrategy shardingStrategy;
35   private final ObjectMapper mapper;
36
37
38   @Override
39   public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
40   {
41     Path path = chatroomsPath();
42     log.info("Writing chatrooms to {}", path);
43     try
44     {
45       Files.createDirectories(storagePath);
46
47       JsonGenerator generator =
48           mapper
49               .getFactory()
50               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
51
52       return chatRoomInfoFlux
53           .log(
54               FilesStorageStrategy.class.getSimpleName(),
55               Level.FINE,
56               true)
57           .doFirst(() ->
58           {
59             try
60             {
61               generator.useDefaultPrettyPrinter();
62               generator.writeStartArray();
63             }
64             catch (IOException e)
65             {
66               throw new RuntimeException(e);
67             }
68           })
69           .doOnTerminate(() ->
70           {
71             try
72             {
73               generator.writeEndArray();
74               generator.close();
75             }
76             catch (IOException e)
77             {
78               throw new RuntimeException(e);
79             }
80           })
81           .map(chatRoomInfo ->
82           {
83             try
84             {
85               ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
86               generator.writeObject(chatRoomInfoTo);
87               return chatRoomInfo;
88             }
89             catch (IOException e)
90             {
91               throw new RuntimeException(e);
92             }
93           });
94     }
95     catch (IOException e)
96     {
97       throw new RuntimeException(e);
98     }
99   }
100
101   @Override
102   public Flux<ChatRoomInfo> readChatRoomInfo()
103   {
104     JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
105     return Flux
106         .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
107         .log(
108             FilesStorageStrategy.class.getSimpleName(),
109             Level.FINE,
110             true)
111         .map(chatRoomInfoTo ->
112         {
113           UUID chatRoomId = chatRoomInfoTo.getId();
114           int shard = shardingStrategy.selectShard(chatRoomId);
115
116           log.info(
117               "{} - old shard: {}, new shard:  {}",
118               chatRoomId,
119               chatRoomInfoTo.getShard(),
120               shard);
121
122           return new ChatRoomInfo(
123               chatRoomId,
124               chatRoomInfoTo.getName(),
125               shard);
126         });
127   }
128
129   @Override
130   public Flux<Message> writeChatRoomData(
131       UUID chatRoomId,
132       Flux<Message> messageFlux)
133   {
134     Path path = chatroomPath(chatRoomId);
135     log.info("Writing messages for {} to {}", chatRoomId, path);
136     try
137     {
138       Files.createDirectories(storagePath);
139
140       JsonGenerator generator =
141           mapper
142               .getFactory()
143               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
144
145       return messageFlux
146           .log(
147               FilesStorageStrategy.class.getSimpleName(),
148               Level.FINE,
149               true)
150           .doFirst(() ->
151           {
152             try
153             {
154               generator.useDefaultPrettyPrinter();
155               generator.writeStartArray();
156             }
157             catch (IOException e)
158             {
159               throw new RuntimeException(e);
160             }
161           })
162           .doOnTerminate(() ->
163           {
164             try
165             {
166               generator.writeEndArray();
167               generator.close();
168             }
169             catch (IOException e)
170             {
171               throw new RuntimeException(e);
172             }
173           })
174           .map(message ->
175           {
176             try
177             {
178               MessageTo messageTo = MessageTo.from(message);
179               generator.writeObject(messageTo);
180               return message;
181             }
182             catch (IOException e)
183             {
184               throw new RuntimeException(e);
185             }
186           });
187     }
188     catch (IOException e)
189     {
190       throw new RuntimeException(e);
191     }
192   }
193
194   @Override
195   public Flux<Message> readChatRoomData(UUID chatRoomId)
196   {
197     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
198     return Flux
199         .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatRoomId), mapper, type))
200         .log(
201             FilesStorageStrategy.class.getSimpleName(),
202             Level.FINE,
203             true)
204         .map(MessageTo::toMessage);
205   }
206
207   Path chatroomsPath()
208   {
209     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
210   }
211
212   Path chatroomPath(UUID id)
213   {
214     return storagePath.resolve(Path.of(id.toString() + ".json"));
215   }
216 }