package de.juplo.kafka.chat.backend.persistence;
import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.api.ChatroomTo;
+import de.juplo.kafka.chat.backend.api.ChatRoomTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.domain.Chatroom;
-import de.juplo.kafka.chat.backend.domain.ChatroomFactory;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.MessageMutationException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Sinks;
import java.io.IOException;
import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
-import java.util.LinkedHashMap;
-import java.util.function.Function;
-import java.util.stream.Collectors;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
private final Path storagePath;
private final ObjectMapper mapper;
- private final ChatroomFactory chatroomFactory;
+ private final InMemoryChatHomeService service;
@Override
- public void writeChatrooms(Flux<Chatroom> chatroomFlux)
+ public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
{
Path path = chatroomsPath();
log.info("Writing chatrooms to {}", path);
{
try
{
- ChatroomTo chatroomTo = ChatroomTo.from(chatroom);
+ ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
generator.writeObject(chatroomTo);
writeMessages(chatroomTo, chatroom.getMessages());
}
}
@Override
- public Flux<Chatroom> readChatrooms()
+ public Flux<ChatRoom> readChatrooms()
{
- Path path = chatroomsPath();
- log.info("Reading chatrooms from {}", path);
- try
- {
- JsonParser parser =
- mapper
- .getFactory()
- .createParser(Files.newBufferedReader(path));
-
- if (parser.nextToken() != JsonToken.START_ARRAY)
- throw new IllegalStateException("Expected content to be an array");
-
- Sinks.Many<ChatroomTo> many = Sinks.many().unicast().onBackpressureBuffer();
-
- while (parser.nextToken() != JsonToken.END_ARRAY)
- {
- many
- .tryEmitNext(mapper.readValue(parser, ChatroomTo.class))
- .orThrow();
- }
-
- many.tryEmitComplete().orThrow();
-
- return many
- .asFlux()
- .map(chatroomTo ->
- {
- LinkedHashMap<Message.MessageKey, Message> messages =
- readMessages(chatroomTo)
- .collect(Collectors.toMap(
- Message::getKey,
- Function.identity(),
- (existing, message) ->
- {
- if (!message.equals(existing))
- throw new MessageMutationException(message, existing);
- return existing;
- },
- LinkedHashMap::new))
- .block();
- InMemoryPersistenceStrategy strategy = new InMemoryPersistenceStrategy(messages);
- return chatroomFactory.restoreChatroom(chatroomTo.getId(), chatroomTo.getName(), strategy);
- });
- }
- catch (NoSuchFileException e)
- {
- log.info("{} does not exist - starting with empty ChatHome", path);
- return Flux.empty();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
+ return Flux
+ .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
+ .log()
+ .map(chatRoomTo ->
+ {
+ InMemoryChatRoomService chatroomService =
+ new InMemoryChatRoomService(readMessages(chatRoomTo));
+ return service.restoreChatroom(
+ chatRoomTo.getId(),
+ chatRoomTo.getName(),
+ chatroomService);
+ });
}
@Override
- public void writeMessages(ChatroomTo chatroomTo, Flux<Message> messageFlux)
+ public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
{
Path path = chatroomPath(chatroomTo);
log.info("Writing messages for {} to {}", chatroomTo, path);
}
@Override
- public Flux<Message> readMessages(ChatroomTo chatroomTo)
+ public Flux<Message> readMessages(ChatRoomTo chatroomTo)
{
- Path path = chatroomPath(chatroomTo);
- log.info("Reading messages for {} from {}", chatroomTo, path);
- try
- {
- JsonParser parser =
- mapper
- .getFactory()
- .createParser(Files.newBufferedReader(path));
-
- if (parser.nextToken() != JsonToken.START_ARRAY)
- throw new IllegalStateException("Expected content to be an array");
-
- Sinks.Many<Message> many = Sinks.many().unicast().onBackpressureBuffer();
-
- while (parser.nextToken() != JsonToken.END_ARRAY)
- {
- many
- .tryEmitNext(mapper.readValue(parser, MessageTo.class).toMessage())
- .orThrow();
- }
-
- many.tryEmitComplete().orThrow();
-
- return many.asFlux();
- }
- catch (NoSuchFileException e)
- {
- log.info(
- "{} does not exist - starting with empty chat for {}",
- path,
- chatroomTo);
- return Flux.empty();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
+ return Flux
+ .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
+ .log()
+ .map(MessageTo::toMessage);
}
Path chatroomsPath()
return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
}
- Path chatroomPath(ChatroomTo chatroomTo)
+ Path chatroomPath(ChatRoomTo chatroomTo)
{
return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
}