1 package de.juplo.kafka.chat.backend.persistence.storage.files;
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatRoomTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
9 import de.juplo.kafka.chat.backend.domain.ChatRoom;
10 import de.juplo.kafka.chat.backend.domain.Message;
11 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
12 import lombok.RequiredArgsConstructor;
13 import lombok.extern.slf4j.Slf4j;
14 import reactor.core.publisher.Flux;
16 import java.io.IOException;
17 import java.nio.file.Files;
18 import java.nio.file.Path;
19 import java.time.Clock;
20 import java.util.UUID;
22 import static java.nio.file.StandardOpenOption.CREATE;
23 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
26 @RequiredArgsConstructor
28 public class FilesStorageStrategy implements StorageStrategy
30 public static final String CHATROOMS_FILENAME = "chatrooms.json";
33 private final Path storagePath;
34 private final Clock clock;
35 private final int bufferSize;
36 private final ShardingStrategy shardingStrategy;
37 private final ChatRoomServiceFactory factory;
38 private final ObjectMapper mapper;
42 public void write(Flux<ChatRoom> chatroomFlux)
44 Path path = chatroomsPath();
45 log.info("Writing chatrooms to {}", path);
48 Files.createDirectories(storagePath);
50 JsonGenerator generator =
53 .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
61 generator.useDefaultPrettyPrinter();
62 generator.writeStartArray();
66 throw new RuntimeException(e);
73 generator.writeEndArray();
78 throw new RuntimeException(e);
81 .subscribe(chatroom ->
85 ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
86 generator.writeObject(chatroomTo);
87 writeMessages(chatroomTo, chatroom.getMessages());
91 throw new RuntimeException(e);
97 throw new RuntimeException(e);
102 public Flux<ChatRoom> read()
104 JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
106 .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
110 UUID chatRoomId = chatRoomTo.getId();
111 int shard = shardingStrategy.selectShard(chatRoomId);
114 chatRoomTo.getName(),
117 factory.create(readMessages(chatRoomTo)),
122 public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
124 Path path = chatroomPath(chatroomTo);
125 log.info("Writing messages for {} to {}", chatroomTo, path);
128 Files.createDirectories(storagePath);
130 JsonGenerator generator =
133 .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
141 generator.useDefaultPrettyPrinter();
142 generator.writeStartArray();
144 catch (IOException e)
146 throw new RuntimeException(e);
153 generator.writeEndArray();
156 catch (IOException e)
158 throw new RuntimeException(e);
161 .subscribe(message ->
165 MessageTo messageTo = MessageTo.from(message);
166 generator.writeObject(messageTo);
168 catch (IOException e)
170 throw new RuntimeException(e);
174 catch (IOException e)
176 throw new RuntimeException(e);
180 public Flux<Message> readMessages(ChatRoomTo chatroomTo)
182 JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
184 .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
186 .map(MessageTo::toMessage);
191 return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
194 Path chatroomPath(ChatRoomTo chatroomTo)
196 return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));