1 package de.juplo.kafka.chat.backend.persistence;
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatroomTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.Chatroom;
9 import de.juplo.kafka.chat.backend.domain.ChatroomFactory;
10 import de.juplo.kafka.chat.backend.domain.Message;
11 import lombok.RequiredArgsConstructor;
12 import lombok.extern.slf4j.Slf4j;
13 import reactor.core.publisher.Flux;
15 import java.io.IOException;
16 import java.nio.file.Files;
17 import java.nio.file.Path;
19 import static java.nio.file.StandardOpenOption.CREATE;
20 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
23 @RequiredArgsConstructor
25 public class LocalJsonFilesStorageStrategy implements StorageStrategy
27 public static final String CHATROOMS_FILENAME = "chatrooms.json";
30 private final Path storagePath;
31 private final ObjectMapper mapper;
32 private final ChatroomFactory chatroomFactory;
36 public void writeChatrooms(Flux<Chatroom> chatroomFlux)
38 Path path = chatroomsPath();
39 log.info("Writing chatrooms to {}", path);
42 Files.createDirectories(storagePath);
44 JsonGenerator generator =
47 .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
55 generator.useDefaultPrettyPrinter();
56 generator.writeStartArray();
60 throw new RuntimeException(e);
67 generator.writeEndArray();
72 throw new RuntimeException(e);
75 .subscribe(chatroom ->
79 ChatroomTo chatroomTo = ChatroomTo.from(chatroom);
80 generator.writeObject(chatroomTo);
81 writeMessages(chatroomTo, chatroom.getMessages());
85 throw new RuntimeException(e);
91 throw new RuntimeException(e);
96 public Flux<Chatroom> readChatrooms()
98 JavaType type = mapper.getTypeFactory().constructType(ChatroomTo.class);
100 .from(new JsonFilePublisher<ChatroomTo>(chatroomsPath(), mapper, type))
104 InMemoryPersistenceStrategy strategy =
105 new InMemoryPersistenceStrategy(readMessages(chatroomTo));
106 return chatroomFactory.restoreChatroom(chatroomTo.getId(), chatroomTo.getName(), strategy);
111 public void writeMessages(ChatroomTo chatroomTo, Flux<Message> messageFlux)
113 Path path = chatroomPath(chatroomTo);
114 log.info("Writing messages for {} to {}", chatroomTo, path);
117 Files.createDirectories(storagePath);
119 JsonGenerator generator =
122 .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
130 generator.useDefaultPrettyPrinter();
131 generator.writeStartArray();
133 catch (IOException e)
135 throw new RuntimeException(e);
142 generator.writeEndArray();
145 catch (IOException e)
147 throw new RuntimeException(e);
150 .subscribe(message ->
154 MessageTo messageTo = MessageTo.from(message);
155 generator.writeObject(messageTo);
157 catch (IOException e)
159 throw new RuntimeException(e);
163 catch (IOException e)
165 throw new RuntimeException(e);
170 public Flux<Message> readMessages(ChatroomTo chatroomTo)
172 JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
174 .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
176 .map(MessageTo::toMessage);
181 return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
184 Path chatroomPath(ChatroomTo chatroomTo)
186 return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));