1 package de.juplo.kafka.chat.backend.storage.files;
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
9 import de.juplo.kafka.chat.backend.domain.Message;
10 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
11 import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
12 import lombok.RequiredArgsConstructor;
13 import lombok.extern.slf4j.Slf4j;
14 import reactor.core.publisher.Flux;
16 import java.io.IOException;
17 import java.nio.file.Files;
18 import java.nio.file.Path;
19 import java.util.UUID;
20 import java.util.logging.Level;
22 import static java.nio.file.StandardOpenOption.CREATE;
23 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
26 @RequiredArgsConstructor
28 public class FilesStorageStrategy implements StorageStrategy
30 public static final String CHATROOMS_FILENAME = "chatrooms.json";
33 private final Path storagePath;
34 private final ShardingStrategy shardingStrategy;
35 private final ObjectMapper mapper;
36 private final Level loggingLevel;
37 private final boolean showOperatorLine;
41 public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
43 Path path = chatroomsPath();
44 log.info("Writing chatrooms to {}", path);
47 Files.createDirectories(storagePath);
49 JsonGenerator generator =
52 .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
54 return chatRoomInfoFlux
56 FilesStorageStrategy.class.getSimpleName(),
63 generator.useDefaultPrettyPrinter();
64 generator.writeStartArray();
68 throw new RuntimeException(e);
75 generator.writeEndArray();
80 throw new RuntimeException(e);
87 ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
88 generator.writeObject(chatRoomInfoTo);
93 throw new RuntimeException(e);
99 throw new RuntimeException(e);
104 public Flux<ChatRoomInfo> readChatRoomInfo()
106 JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
108 .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
110 FilesStorageStrategy.class.getSimpleName(),
113 .map(chatRoomInfoTo ->
115 UUID chatRoomId = chatRoomInfoTo.getId();
116 int shard = shardingStrategy.selectShard(chatRoomId);
119 "{} - old shard: {}, new shard: {}",
121 chatRoomInfoTo.getShard(),
124 return new ChatRoomInfo(
126 chatRoomInfoTo.getName(),
132 public Flux<Message> writeChatRoomData(
134 Flux<Message> messageFlux)
136 Path path = chatroomPath(chatRoomId);
137 log.info("Writing messages for {} to {}", chatRoomId, path);
140 Files.createDirectories(storagePath);
142 JsonGenerator generator =
145 .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
149 FilesStorageStrategy.class.getSimpleName(),
156 generator.useDefaultPrettyPrinter();
157 generator.writeStartArray();
159 catch (IOException e)
161 throw new RuntimeException(e);
168 generator.writeEndArray();
171 catch (IOException e)
173 throw new RuntimeException(e);
180 MessageTo messageTo = MessageTo.from(message);
181 generator.writeObject(messageTo);
184 catch (IOException e)
186 throw new RuntimeException(e);
190 catch (IOException e)
192 throw new RuntimeException(e);
197 public Flux<Message> readChatRoomData(UUID chatRoomId)
199 JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
201 .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatRoomId), mapper, type))
203 FilesStorageStrategy.class.getSimpleName(),
206 .map(MessageTo::toMessage);
211 return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
214 Path chatroomPath(UUID id)
216 return storagePath.resolve(Path.of(id.toString() + ".json"));