1 package de.juplo.kafka.chat.backend.persistence;
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatRoomTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.ChatRoom;
9 import de.juplo.kafka.chat.backend.domain.Message;
10 import lombok.RequiredArgsConstructor;
11 import lombok.extern.slf4j.Slf4j;
12 import reactor.core.publisher.Flux;
14 import java.io.IOException;
15 import java.nio.file.Files;
16 import java.nio.file.Path;
18 import static java.nio.file.StandardOpenOption.CREATE;
19 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
22 @RequiredArgsConstructor
24 public class LocalJsonFilesStorageStrategy implements StorageStrategy
26 public static final String CHATROOMS_FILENAME = "chatrooms.json";
29 private final Path storagePath;
30 private final ObjectMapper mapper;
31 private final InMemoryChatHomeService service;
35 public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
37 Path path = chatroomsPath();
38 log.info("Writing chatrooms to {}", path);
41 Files.createDirectories(storagePath);
43 JsonGenerator generator =
46 .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
54 generator.useDefaultPrettyPrinter();
55 generator.writeStartArray();
59 throw new RuntimeException(e);
66 generator.writeEndArray();
71 throw new RuntimeException(e);
74 .subscribe(chatroom ->
78 ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
79 generator.writeObject(chatroomTo);
80 writeMessages(chatroomTo, chatroom.getMessages());
84 throw new RuntimeException(e);
90 throw new RuntimeException(e);
95 public Flux<ChatRoom> readChatrooms()
97 JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
99 .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
103 InMemoryChatRoomService chatroomService =
104 new InMemoryChatRoomService(readMessages(chatRoomTo));
105 return service.restoreChatroom(
107 chatRoomTo.getName(),
113 public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
115 Path path = chatroomPath(chatroomTo);
116 log.info("Writing messages for {} to {}", chatroomTo, path);
119 Files.createDirectories(storagePath);
121 JsonGenerator generator =
124 .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
132 generator.useDefaultPrettyPrinter();
133 generator.writeStartArray();
135 catch (IOException e)
137 throw new RuntimeException(e);
144 generator.writeEndArray();
147 catch (IOException e)
149 throw new RuntimeException(e);
152 .subscribe(message ->
156 MessageTo messageTo = MessageTo.from(message);
157 generator.writeObject(messageTo);
159 catch (IOException e)
161 throw new RuntimeException(e);
165 catch (IOException e)
167 throw new RuntimeException(e);
172 public Flux<Message> readMessages(ChatRoomTo chatroomTo)
174 JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
176 .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
178 .map(MessageTo::toMessage);
183 return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
186 Path chatroomPath(ChatRoomTo chatroomTo)
188 return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));