1 package de.juplo.kafka.chat.backend.persistence;
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.core.JsonParser;
5 import com.fasterxml.jackson.core.JsonToken;
6 import com.fasterxml.jackson.databind.ObjectMapper;
7 import de.juplo.kafka.chat.backend.api.ChatroomTo;
8 import de.juplo.kafka.chat.backend.api.MessageTo;
9 import de.juplo.kafka.chat.backend.domain.Chatroom;
10 import de.juplo.kafka.chat.backend.domain.ChatroomFactory;
11 import de.juplo.kafka.chat.backend.domain.Message;
12 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
13 import lombok.RequiredArgsConstructor;
14 import lombok.extern.slf4j.Slf4j;
15 import reactor.core.publisher.Flux;
16 import reactor.core.publisher.Sinks;
18 import java.io.IOException;
19 import java.nio.file.Files;
20 import java.nio.file.NoSuchFileException;
21 import java.nio.file.Path;
22 import java.util.LinkedHashMap;
23 import java.util.function.Function;
24 import java.util.stream.Collectors;
26 import static java.nio.file.StandardOpenOption.CREATE;
27 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
30 @RequiredArgsConstructor
32 public class LocalJsonFilesStorageStrategy implements StorageStrategy
34 public static final String CHATROOMS_FILENAME = "chatrooms.json";
37 private final Path storagePath;
38 private final ObjectMapper mapper;
39 private final ChatroomFactory chatroomFactory;
43 public void writeChatrooms(Flux<Chatroom> chatroomFlux)
45 Path path = chatroomsPath();
46 log.info("Writing chatrooms to {}", path);
49 Files.createDirectories(storagePath);
51 JsonGenerator generator =
54 .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
62 generator.useDefaultPrettyPrinter();
63 generator.writeStartArray();
67 throw new RuntimeException(e);
74 generator.writeEndArray();
79 throw new RuntimeException(e);
82 .subscribe(chatroom ->
86 ChatroomTo chatroomTo = ChatroomTo.from(chatroom);
87 generator.writeObject(chatroomTo);
88 writeMessages(chatroomTo, chatroom.getMessages());
92 throw new RuntimeException(e);
98 throw new RuntimeException(e);
103 public Flux<Chatroom> readChatrooms()
105 Path path = chatroomsPath();
106 log.info("Reading chatrooms from {}", path);
112 .createParser(Files.newBufferedReader(path));
114 if (parser.nextToken() != JsonToken.START_ARRAY)
115 throw new IllegalStateException("Expected content to be an array");
117 Sinks.Many<ChatroomTo> many = Sinks.many().unicast().onBackpressureBuffer();
119 while (parser.nextToken() != JsonToken.END_ARRAY)
122 .tryEmitNext(mapper.readValue(parser, ChatroomTo.class))
126 many.tryEmitComplete().orThrow();
132 LinkedHashMap<Message.MessageKey, Message> messages =
133 readMessages(chatroomTo)
134 .collect(Collectors.toMap(
137 (existing, message) ->
139 if (!message.equals(existing))
140 throw new MessageMutationException(message, existing);
145 InMemoryPersistenceStrategy strategy = new InMemoryPersistenceStrategy(messages);
146 return chatroomFactory.restoreChatroom(chatroomTo.getId(), chatroomTo.getName(), strategy);
149 catch (NoSuchFileException e)
151 log.info("{} does not exist - starting with empty ChatHome", path);
154 catch (IOException e)
156 throw new RuntimeException(e);
161 public void writeMessages(ChatroomTo chatroomTo, Flux<Message> messageFlux)
163 Path path = chatroomPath(chatroomTo);
164 log.info("Writing messages for {} to {}", chatroomTo, path);
167 Files.createDirectories(storagePath);
169 JsonGenerator generator =
172 .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
180 generator.useDefaultPrettyPrinter();
181 generator.writeStartArray();
183 catch (IOException e)
185 throw new RuntimeException(e);
192 generator.writeEndArray();
195 catch (IOException e)
197 throw new RuntimeException(e);
200 .subscribe(message ->
204 MessageTo messageTo = MessageTo.from(message);
205 generator.writeObject(messageTo);
207 catch (IOException e)
209 throw new RuntimeException(e);
213 catch (IOException e)
215 throw new RuntimeException(e);
220 public Flux<Message> readMessages(ChatroomTo chatroomTo)
222 Path path = chatroomPath(chatroomTo);
223 log.info("Reading messages for {} from {}", chatroomTo, path);
229 .createParser(Files.newBufferedReader(path));
231 if (parser.nextToken() != JsonToken.START_ARRAY)
232 throw new IllegalStateException("Expected content to be an array");
234 Sinks.Many<Message> many = Sinks.many().unicast().onBackpressureBuffer();
236 while (parser.nextToken() != JsonToken.END_ARRAY)
239 .tryEmitNext(mapper.readValue(parser, MessageTo.class).toMessage())
243 many.tryEmitComplete().orThrow();
245 return many.asFlux();
247 catch (NoSuchFileException e)
250 "{} does not exist - starting with empty chat for {}",
255 catch (IOException e)
257 throw new RuntimeException(e);
263 return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
266 Path chatroomPath(ChatroomTo chatroomTo)
268 return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));