@Override
- public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+ public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
{
Path path = chatroomsPath();
log.info("Writing chatrooms to {}", path);
.getFactory()
.createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
- chatRoomInfoFlux
+ return chatRoomInfoFlux
.log()
.doFirst(() ->
{
throw new RuntimeException(e);
}
})
- .subscribe(chatRoomInfo ->
+ .map(chatRoomInfo ->
{
try
{
ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
generator.writeObject(chatRoomInfoTo);
+ return chatRoomInfo;
}
catch (IOException e)
{
}
@Override
- public void writeChatRoomData(
+ public Flux<Message> writeChatRoomData(
UUID chatRoomId,
Flux<Message> messageFlux)
{
.getFactory()
.createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
- messageFlux
+ return messageFlux
.log()
.doFirst(() ->
{
throw new RuntimeException(e);
}
})
- .subscribe(message ->
+ .map(message ->
{
try
{
MessageTo messageTo = MessageTo.from(message);
generator.writeObject(messageTo);
+ return message;
}
catch (IOException e)
{