1 package de.juplo.kafka.chat.backend.storage.files;
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
9 import de.juplo.kafka.chat.backend.domain.Message;
10 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
11 import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
12 import lombok.RequiredArgsConstructor;
13 import lombok.extern.slf4j.Slf4j;
14 import reactor.core.publisher.Flux;
16 import java.io.IOException;
17 import java.nio.file.Files;
18 import java.nio.file.Path;
19 import java.util.UUID;
21 import static java.nio.file.StandardOpenOption.CREATE;
22 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
25 @RequiredArgsConstructor
27 public class FilesStorageStrategy implements StorageStrategy
29 public static final String CHATROOMS_FILENAME = "chatrooms.json";
32 private final Path storagePath;
33 private final ShardingStrategy shardingStrategy;
34 private final ObjectMapper mapper;
38 public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
40 Path path = chatroomsPath();
41 log.info("Writing chatrooms to {}", path);
44 Files.createDirectories(storagePath);
46 JsonGenerator generator =
49 .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
57 generator.useDefaultPrettyPrinter();
58 generator.writeStartArray();
62 throw new RuntimeException(e);
69 generator.writeEndArray();
74 throw new RuntimeException(e);
81 ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
82 generator.writeObject(chatRoomInfoTo);
87 throw new RuntimeException(e);
94 throw new RuntimeException(e);
99 public Flux<ChatRoomInfo> readChatRoomInfo()
101 JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
103 .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
105 .map(chatRoomInfoTo ->
107 UUID chatRoomId = chatRoomInfoTo.getId();
108 int shard = shardingStrategy.selectShard(chatRoomId);
111 "{} - old shard: {}, new shard: {}",
113 chatRoomInfoTo.getShard(),
116 return new ChatRoomInfo(
118 chatRoomInfoTo.getName(),
124 public void writeChatRoomData(
126 Flux<Message> messageFlux)
128 Path path = chatroomPath(chatRoomId);
129 log.info("Writing messages for {} to {}", chatRoomId, path);
132 Files.createDirectories(storagePath);
134 JsonGenerator generator =
137 .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
145 generator.useDefaultPrettyPrinter();
146 generator.writeStartArray();
148 catch (IOException e)
150 throw new RuntimeException(e);
157 generator.writeEndArray();
160 catch (IOException e)
162 throw new RuntimeException(e);
169 MessageTo messageTo = MessageTo.from(message);
170 generator.writeObject(messageTo);
173 catch (IOException e)
175 throw new RuntimeException(e);
180 catch (IOException e)
182 throw new RuntimeException(e);
187 public Flux<Message> readChatRoomData(UUID chatRoomId)
189 JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
191 .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatRoomId), mapper, type))
193 .map(MessageTo::toMessage);
198 return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
201 Path chatroomPath(UUID id)
203 return storagePath.resolve(Path.of(id.toString() + ".json"));