1 package de.juplo.kafka.chat.backend.persistence.filestorage;
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatRoomTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.ChatRoom;
9 import de.juplo.kafka.chat.backend.domain.Message;
10 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
11 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
12 import lombok.RequiredArgsConstructor;
13 import lombok.extern.slf4j.Slf4j;
14 import reactor.core.publisher.Flux;
16 import java.io.IOException;
17 import java.nio.file.Files;
18 import java.nio.file.Path;
19 import java.time.Clock;
21 import static java.nio.file.StandardOpenOption.CREATE;
22 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
25 @RequiredArgsConstructor
27 public class FileStorageStrategy implements StorageStrategy
29 public static final String CHATROOMS_FILENAME = "chatrooms.json";
32 private final Path storagePath;
33 private final Clock clock;
34 private final int bufferSize;
35 private final ObjectMapper mapper;
39 public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
41 Path path = chatroomsPath();
42 log.info("Writing chatrooms to {}", path);
45 Files.createDirectories(storagePath);
47 JsonGenerator generator =
50 .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
58 generator.useDefaultPrettyPrinter();
59 generator.writeStartArray();
63 throw new RuntimeException(e);
70 generator.writeEndArray();
75 throw new RuntimeException(e);
78 .subscribe(chatroom ->
82 ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
83 generator.writeObject(chatroomTo);
84 writeMessages(chatroomTo, chatroom.getMessages());
88 throw new RuntimeException(e);
94 throw new RuntimeException(e);
99 public Flux<ChatRoom> readChatrooms()
101 JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
103 .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
107 InMemoryChatRoomService chatroomService =
108 new InMemoryChatRoomService(readMessages(chatRoomTo));
111 chatRoomTo.getName(),
119 public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
121 Path path = chatroomPath(chatroomTo);
122 log.info("Writing messages for {} to {}", chatroomTo, path);
125 Files.createDirectories(storagePath);
127 JsonGenerator generator =
130 .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
138 generator.useDefaultPrettyPrinter();
139 generator.writeStartArray();
141 catch (IOException e)
143 throw new RuntimeException(e);
150 generator.writeEndArray();
153 catch (IOException e)
155 throw new RuntimeException(e);
158 .subscribe(message ->
162 MessageTo messageTo = MessageTo.from(message);
163 generator.writeObject(messageTo);
165 catch (IOException e)
167 throw new RuntimeException(e);
171 catch (IOException e)
173 throw new RuntimeException(e);
178 public Flux<Message> readMessages(ChatRoomTo chatroomTo)
180 JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
182 .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
184 .map(MessageTo::toMessage);
189 return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
192 Path chatroomPath(ChatRoomTo chatroomTo)
194 return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));