- Path path = chatroomsPath();
- log.info("Reading chatrooms from {}", path);
- try
- {
- JsonParser parser =
- mapper
- .getFactory()
- .createParser(Files.newBufferedReader(path));
-
- if (parser.nextToken() != JsonToken.START_ARRAY)
- throw new IllegalStateException("Expected content to be an array");
-
- Sinks.Many<ChatroomInfo> many = Sinks.many().unicast().onBackpressureBuffer();
-
- while (parser.nextToken() != JsonToken.END_ARRAY)
- {
- many
- .tryEmitNext(mapper.readValue(parser, ChatroomInfo.class))
- .orThrow();
- }
-
- many.tryEmitComplete().orThrow();
-
- return many
- .asFlux()
- .map(chatroomInfo ->
- {
- LinkedHashMap<Message.MessageKey, Message> messages =
- readMessages(chatroomInfo)
- .collect(Collectors.toMap(
- Message::getKey,
- Function.identity(),
- (existing, message) ->
- {
- if (!message.equals(existing))
- throw new MessageMutationException(message, existing);
- return existing;
- },
- LinkedHashMap::new))
- .block();
- InMemoryPersistenceStrategy strategy = new InMemoryPersistenceStrategy(messages);
- return chatroomFactory.restoreChatroom(chatroomInfo.getId(), chatroomInfo.getName(), strategy);
- });
- }
- catch (NoSuchFileException e)
- {
- log.info("{} does not exist - starting with empty ChatHome", path);
- return Flux.empty();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ JavaType type = mapper.getTypeFactory().constructType(ChatroomTo.class);
+ return Flux
+ .from(new JsonFilePublisher<ChatroomTo>(chatroomsPath(), mapper, type))
+ .log()
+ .map(chatroomTo ->
+ {
+ InMemoryChatroomService chatroomService =
+ new InMemoryChatroomService(readMessages(chatroomTo));
+ return chatroomFactory.restoreChatroom(
+ chatroomTo.getId(),
+ chatroomTo.getName(),
+ chatroomService);
+ });