package de.juplo.kafka.chat.backend.persistence;
import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.api.ChatRoomTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.domain.Chatroom;
-import de.juplo.kafka.chat.backend.domain.ChatroomFactory;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.MessageMutationException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Sinks;
import java.io.IOException;
import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
-import java.util.LinkedHashMap;
-import java.util.function.Function;
-import java.util.stream.Collector;
-import java.util.stream.Collectors;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
private final Path storagePath;
private final ObjectMapper mapper;
- private final ChatroomFactory chatroomFactory;
+ private final InMemoryChatHomeService service;
@Override
- public void writeChatrooms(Flux<Chatroom> chatroomFlux)
+ public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
{
Path path = chatroomsPath();
log.info("Writing chatrooms to {}", path);
{
try
{
- ChatroomInfo chatroomInfo = ChatroomInfo.from(chatroom);
- generator.writeObject(chatroomInfo);
- writeMessages(chatroomInfo, chatroom.getMessages());
+ ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
+ generator.writeObject(chatroomTo);
+ writeMessages(chatroomTo, chatroom.getMessages());
}
catch (IOException e)
{
}
@Override
- public Flux<Chatroom> readChatrooms()
+ public Flux<ChatRoom> readChatrooms()
{
- Path path = chatroomsPath();
- log.info("Reading chatrooms from {}", path);
- try
- {
- JsonParser parser =
- mapper
- .getFactory()
- .createParser(Files.newBufferedReader(path));
-
- if (parser.nextToken() != JsonToken.START_ARRAY)
- throw new IllegalStateException("Expected content to be an array");
-
- Sinks.Many<ChatroomInfo> many = Sinks.many().unicast().onBackpressureBuffer();
-
- while (parser.nextToken() != JsonToken.END_ARRAY)
- {
- many
- .tryEmitNext(mapper.readValue(parser, ChatroomInfo.class))
- .orThrow();
- }
-
- many.tryEmitComplete().orThrow();
-
- return many
- .asFlux()
- .map(chatroomInfo ->
- {
- LinkedHashMap<Message.MessageKey, Message> messages =
- readMessages(chatroomInfo)
- .collect(Collectors.toMap(
- Message::getKey,
- Function.identity(),
- (existing, message) ->
- {
- if (!message.equals(existing))
- throw new MessageMutationException(message, existing);
- return existing;
- },
- LinkedHashMap::new))
- .block();
- InMemoryPersistenceStrategy strategy = new InMemoryPersistenceStrategy(messages);
- return chatroomFactory.restoreChatroom(chatroomInfo.getId(), chatroomInfo.getName(), strategy);
- });
- }
- catch (NoSuchFileException e)
- {
- log.info("{} does not exist - starting with empty ChatHome", path);
- return Flux.empty();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
+ return Flux
+ .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
+ .log()
+ .map(chatRoomTo ->
+ {
+ InMemoryChatRoomService chatroomService =
+ new InMemoryChatRoomService(readMessages(chatRoomTo));
+ return service.restoreChatroom(
+ chatRoomTo.getId(),
+ chatRoomTo.getName(),
+ chatroomService);
+ });
}
@Override
- public void writeMessages(ChatroomInfo chatroomInfo, Flux<Message> messageFlux)
+ public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
{
- Path path = chatroomPath(chatroomInfo);
- log.info("Writing messages for {} to {}", chatroomInfo, path);
+ Path path = chatroomPath(chatroomTo);
+ log.info("Writing messages for {} to {}", chatroomTo, path);
try
{
Files.createDirectories(storagePath);
}
@Override
- public Flux<Message> readMessages(ChatroomInfo chatroomInfo)
+ public Flux<Message> readMessages(ChatRoomTo chatroomTo)
{
- Path path = chatroomPath(chatroomInfo);
- log.info("Reading messages for {} from {}", chatroomInfo, path);
- try
- {
- JsonParser parser =
- mapper
- .getFactory()
- .createParser(Files.newBufferedReader(path));
-
- if (parser.nextToken() != JsonToken.START_ARRAY)
- throw new IllegalStateException("Expected content to be an array");
-
- Sinks.Many<Message> many = Sinks.many().unicast().onBackpressureBuffer();
-
- while (parser.nextToken() != JsonToken.END_ARRAY)
- {
- many
- .tryEmitNext(mapper.readValue(parser, MessageTo.class).toMessage())
- .orThrow();
- }
-
- many.tryEmitComplete().orThrow();
-
- return many.asFlux();
- }
- catch (NoSuchFileException e)
- {
- log.info(
- "{} does not exist - starting with empty chat for {}",
- path,
- chatroomInfo);
- return Flux.empty();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
+ return Flux
+ .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
+ .log()
+ .map(MessageTo::toMessage);
}
Path chatroomsPath()
return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
}
- Path chatroomPath(ChatroomInfo chatroomInfo)
+ Path chatroomPath(ChatRoomTo chatroomTo)
{
- return storagePath.resolve(Path.of(chatroomInfo.getId().toString() + ".json"));
+ return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
}
}