package de.juplo.kafka.chat.backend.persistence;
import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.api.ChatroomTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.domain.Chatroom;
-import de.juplo.kafka.chat.backend.domain.ChatroomFactory;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.MessageMutationException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Sinks;
import java.io.IOException;
import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
-import java.util.LinkedHashMap;
-import java.util.function.Function;
-import java.util.stream.Collectors;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
private final Path storagePath;
private final ObjectMapper mapper;
- private final ChatroomFactory chatroomFactory;
+ private final InMemoryChatHomeService service;
@Override
- public void writeChatrooms(Flux<Chatroom> chatroomFlux)
+ public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
{
Path path = chatroomsPath();
log.info("Writing chatrooms to {}", path);
}
@Override
- public Flux<Chatroom> readChatrooms()
+ public Flux<ChatRoom> readChatrooms()
{
- Path path = chatroomsPath();
- log.info("Reading chatrooms from {}", path);
- try
- {
- JsonParser parser =
- mapper
- .getFactory()
- .createParser(Files.newBufferedReader(path));
-
- if (parser.nextToken() != JsonToken.START_ARRAY)
- throw new IllegalStateException("Expected content to be an array");
-
- Sinks.Many<ChatroomTo> many = Sinks.many().unicast().onBackpressureBuffer();
-
- while (parser.nextToken() != JsonToken.END_ARRAY)
- {
- many
- .tryEmitNext(mapper.readValue(parser, ChatroomTo.class))
- .orThrow();
- }
-
- many.tryEmitComplete().orThrow();
-
- return many
- .asFlux()
- .map(chatroomTo ->
- {
- LinkedHashMap<Message.MessageKey, Message> messages =
- readMessages(chatroomTo)
- .collect(Collectors.toMap(
- Message::getKey,
- Function.identity(),
- (existing, message) ->
- {
- if (!message.equals(existing))
- throw new MessageMutationException(message, existing);
- return existing;
- },
- LinkedHashMap::new))
- .block();
- InMemoryPersistenceStrategy strategy = new InMemoryPersistenceStrategy(messages);
- return chatroomFactory.restoreChatroom(chatroomTo.getId(), chatroomTo.getName(), strategy);
- });
- }
- catch (NoSuchFileException e)
- {
- log.info("{} does not exist - starting with empty ChatHome", path);
- return Flux.empty();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ JavaType type = mapper.getTypeFactory().constructType(ChatroomTo.class);
+ return Flux
+ .from(new JsonFilePublisher<ChatroomTo>(chatroomsPath(), mapper, type))
+ .log()
+ .map(chatroomTo ->
+ {
+ InMemoryChatroomService chatroomService =
+ new InMemoryChatroomService(readMessages(chatroomTo));
+ return service.restoreChatroom(
+ chatroomTo.getId(),
+ chatroomTo.getName(),
+ chatroomService);
+ });
}
@Override
@Override
public Flux<Message> readMessages(ChatroomTo chatroomTo)
{
- Path path = chatroomPath(chatroomTo);
- log.info("Reading messages for {} from {}", chatroomTo, path);
- try
- {
- JsonParser parser =
- mapper
- .getFactory()
- .createParser(Files.newBufferedReader(path));
-
- if (parser.nextToken() != JsonToken.START_ARRAY)
- throw new IllegalStateException("Expected content to be an array");
-
- Sinks.Many<Message> many = Sinks.many().unicast().onBackpressureBuffer();
-
- while (parser.nextToken() != JsonToken.END_ARRAY)
- {
- many
- .tryEmitNext(mapper.readValue(parser, MessageTo.class).toMessage())
- .orThrow();
- }
-
- many.tryEmitComplete().orThrow();
-
- return many.asFlux();
- }
- catch (NoSuchFileException e)
- {
- log.info(
- "{} does not exist - starting with empty chat for {}",
- path,
- chatroomTo);
- return Flux.empty();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
+ return Flux
+ .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
+ .log()
+ .map(MessageTo::toMessage);
}
Path chatroomsPath()