this::logFailure)));
}
- void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
+ Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
Flux<ChatRoomInfo> readChatRoomInfo();
- default void writeChatRoomData(
+ default Flux<Message> writeChatRoomData(
UUID chatRoomId,
Flux<Message> messageFlux,
SuccessCallback successCallback,
FailureCallback failureCallback)
{
- writeChatRoomData(
+ return writeChatRoomData(
chatRoomId,
messageFlux
.doOnComplete(() -> successCallback.accept(chatRoomId))
.doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
}
- void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
+ Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
Flux<Message> readChatRoomData(UUID chatRoomId);
interface SuccessCallback extends Consumer<UUID> {}
@Override
- public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+ public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
{
Path path = chatroomsPath();
log.info("Writing chatrooms to {}", path);
.getFactory()
.createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
- chatRoomInfoFlux
+ return chatRoomInfoFlux
.log()
.doFirst(() ->
{
{
throw new RuntimeException(e);
}
- })
- .subscribe();
+ });
}
catch (IOException e)
{
}
@Override
- public void writeChatRoomData(
+ public Flux<Message> writeChatRoomData(
UUID chatRoomId,
Flux<Message> messageFlux)
{
.getFactory()
.createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
- messageFlux
+ return messageFlux
.log()
.doFirst(() ->
{
{
throw new RuntimeException(e);
}
- })
- .subscribe();
+ });
}
catch (IOException e)
{
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
+import java.util.UUID;
+
@AllArgsConstructor
@NoArgsConstructor
private String id;
private String name;
+ public ChatRoomInfo toChatRoomInfo()
+ {
+ return new ChatRoomInfo(
+ UUID.fromString(id),
+ name);
+ }
+
public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
{
return new ChatRoomTo(
@Override
- public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+ public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
{
- chatRoomInfoFlux
+ return chatRoomInfoFlux
.map(ChatRoomTo::from)
.map(chatroomTo -> chatRoomRepository.save(chatroomTo))
- .subscribe();
+ .map(ChatRoomTo::toChatRoomInfo);
}
@Override
}
@Override
- public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+ public Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
{
- messageFlux
+ return messageFlux
.map(message -> MessageTo.from(chatRoomId, message))
.map(messageTo -> messageRepository.save(messageTo))
- .subscribe();
+ .map(MessageTo::toMessage);
}
@Override
return new StorageStrategy()
{
@Override
- public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux) {}
+ public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+ {
+ return chatRoomInfoFlux;
+ }
@Override
public Flux<ChatRoomInfo> readChatRoomInfo()
}
@Override
- public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux) {}
+ public Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+ {
+ return messageFlux;
+ }
@Override
public Flux<Message> readChatRoomData(UUID chatRoomId)