1 package de.juplo.kafka.chat.backend.persistence;
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatRoomTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.ChatRoom;
9 import de.juplo.kafka.chat.backend.domain.Message;
10 import lombok.RequiredArgsConstructor;
11 import lombok.extern.slf4j.Slf4j;
12 import reactor.core.publisher.Flux;
14 import java.io.IOException;
15 import java.nio.file.Files;
16 import java.nio.file.Path;
17 import java.time.Clock;
19 import static java.nio.file.StandardOpenOption.CREATE;
20 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
23 @RequiredArgsConstructor
25 public class LocalJsonFilesStorageStrategy implements StorageStrategy
27 public static final String CHATROOMS_FILENAME = "chatrooms.json";
30 private final Path storagePath;
31 private final Clock clock;
32 private final int bufferSize;
33 private final ObjectMapper mapper;
37 public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
39 Path path = chatroomsPath();
40 log.info("Writing chatrooms to {}", path);
43 Files.createDirectories(storagePath);
45 JsonGenerator generator =
48 .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
56 generator.useDefaultPrettyPrinter();
57 generator.writeStartArray();
61 throw new RuntimeException(e);
68 generator.writeEndArray();
73 throw new RuntimeException(e);
76 .subscribe(chatroom ->
80 ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
81 generator.writeObject(chatroomTo);
82 writeMessages(chatroomTo, chatroom.getMessages());
86 throw new RuntimeException(e);
92 throw new RuntimeException(e);
97 public Flux<ChatRoom> readChatrooms()
99 JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
101 .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
105 InMemoryChatRoomService chatroomService =
106 new InMemoryChatRoomService(readMessages(chatRoomTo));
109 chatRoomTo.getName(),
117 public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
119 Path path = chatroomPath(chatroomTo);
120 log.info("Writing messages for {} to {}", chatroomTo, path);
123 Files.createDirectories(storagePath);
125 JsonGenerator generator =
128 .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
136 generator.useDefaultPrettyPrinter();
137 generator.writeStartArray();
139 catch (IOException e)
141 throw new RuntimeException(e);
148 generator.writeEndArray();
151 catch (IOException e)
153 throw new RuntimeException(e);
156 .subscribe(message ->
160 MessageTo messageTo = MessageTo.from(message);
161 generator.writeObject(messageTo);
163 catch (IOException e)
165 throw new RuntimeException(e);
169 catch (IOException e)
171 throw new RuntimeException(e);
176 public Flux<Message> readMessages(ChatRoomTo chatroomTo)
178 JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
180 .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
182 .map(MessageTo::toMessage);
187 return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
190 Path chatroomPath(ChatRoomTo chatroomTo)
192 return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));