1 package de.juplo.kafka.chat.backend.persistence.storage.files;
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
9 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
10 import de.juplo.kafka.chat.backend.domain.ChatRoom;
11 import de.juplo.kafka.chat.backend.domain.Message;
12 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
13 import lombok.RequiredArgsConstructor;
14 import lombok.extern.slf4j.Slf4j;
15 import reactor.core.publisher.Flux;
17 import java.io.IOException;
18 import java.nio.file.Files;
19 import java.nio.file.Path;
20 import java.time.Clock;
21 import java.util.UUID;
23 import static java.nio.file.StandardOpenOption.CREATE;
24 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
27 @RequiredArgsConstructor
29 public class FilesStorageStrategy implements StorageStrategy
31 public static final String CHATROOMS_FILENAME = "chatrooms.json";
34 private final Path storagePath;
35 private final Clock clock;
36 private final int bufferSize;
37 private final ShardingStrategy shardingStrategy;
38 private final ChatRoomServiceFactory factory;
39 private final ObjectMapper mapper;
43 public void write(Flux<ChatRoomInfo> chatroomFlux)
45 Path path = chatroomsPath();
46 log.info("Writing chatrooms to {}", path);
49 Files.createDirectories(storagePath);
51 JsonGenerator generator =
54 .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
62 generator.useDefaultPrettyPrinter();
63 generator.writeStartArray();
67 throw new RuntimeException(e);
74 generator.writeEndArray();
79 throw new RuntimeException(e);
82 .subscribe(chatroom ->
86 ChatRoomInfoTo infoTo = ChatRoomInfoTo.from(chatroom);
87 generator.writeObject(infoTo);
88 writeMessages(infoTo, chatroom.getMessages());
92 throw new RuntimeException(e);
98 throw new RuntimeException(e);
103 public Flux<ChatRoom> read()
105 JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
107 .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
111 UUID chatRoomId = infoTo.getId();
112 int shard = shardingStrategy.selectShard(chatRoomId);
118 factory.create(readMessages(infoTo)),
123 public void writeMessages(ChatRoomInfoTo infoTo, Flux<Message> messageFlux)
125 Path path = chatroomPath(infoTo);
126 log.info("Writing messages for {} to {}", infoTo, path);
129 Files.createDirectories(storagePath);
131 JsonGenerator generator =
134 .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
142 generator.useDefaultPrettyPrinter();
143 generator.writeStartArray();
145 catch (IOException e)
147 throw new RuntimeException(e);
154 generator.writeEndArray();
157 catch (IOException e)
159 throw new RuntimeException(e);
162 .subscribe(message ->
166 MessageTo messageTo = MessageTo.from(message);
167 generator.writeObject(messageTo);
169 catch (IOException e)
171 throw new RuntimeException(e);
175 catch (IOException e)
177 throw new RuntimeException(e);
181 public Flux<Message> readMessages(ChatRoomInfoTo infoTo)
183 JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
185 .from(new JsonFilePublisher<MessageTo>(chatroomPath(infoTo), mapper, type))
187 .map(MessageTo::toMessage);
192 return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
195 Path chatroomPath(ChatRoomInfoTo infoTo)
197 return storagePath.resolve(Path.of(infoTo.getId().toString() + ".json"));