@PreDestroy
public void onExit()
{
- storageStrategy.write(chatHomeService);
+ storageStrategy
+ .write(chatHomeService)
+ .subscribe();
}
public static void main(String[] args)
@PostMapping("/store")
public void store()
{
- storageStrategy.write(chatHomeService);
+ storageStrategy.write(chatHomeService).subscribe();
}
}
{
Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
- default void write(ChatHomeService chatHomeService)
+ default Flux<ChatRoomInfo> write(ChatHomeService chatHomeService)
{
- write(
+ return write(
chatHomeService,
this::logSuccessChatHomeService,
this::logFailureChatHomeService);
}
- default void write(
+ default Flux<ChatRoomInfo> write(
ChatHomeService chatHomeService,
ChatHomeServiceWrittenSuccessCallback successCallback,
ChatHomeServiceWrittenFailureCallback failureCallback)
{
- writeChatRoomInfo(
+ return writeChatRoomInfo(
chatHomeService
.getChatRoomInfo()
.doOnComplete(() -> successCallback.accept(chatHomeService))
chatHomeService
.getChatRoomData(chatRoomInfo.getId())
.flatMapMany(chatRoomData -> chatRoomData.getMessages()),
+
this::logSuccessChatRoom,
- this::logFailureChatRoom)));
+ this::logFailureChatRoom).subscribe()));
}
- void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
+ Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
Flux<ChatRoomInfo> readChatRoomInfo();
- default void writeChatRoomData(
+ default Flux<Message> writeChatRoomData(
UUID chatRoomId,
Flux<Message> messageFlux,
ChatRoomWrittenSuccessCallback successCallback,
ChatRoomWrittenFailureCallback failureCallback)
{
- writeChatRoomData(
+ return writeChatRoomData(
chatRoomId,
messageFlux
.doOnComplete(() -> successCallback.accept(chatRoomId))
.doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
}
- void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
+ Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
Flux<Message> readChatRoomData(UUID chatRoomId);
interface ChatHomeServiceWrittenSuccessCallback extends Consumer<ChatHomeService> {}
@Override
- public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+ public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
{
Path path = chatroomsPath();
log.info("Writing chatrooms to {}", path);
.getFactory()
.createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
- chatRoomInfoFlux
+ return chatRoomInfoFlux
.log()
.doFirst(() ->
{
{
throw new RuntimeException(e);
}
- })
- .subscribe();
+ });
}
catch (IOException e)
{
}
@Override
- public void writeChatRoomData(
+ public Flux<Message> writeChatRoomData(
UUID chatRoomId,
Flux<Message> messageFlux)
{
.getFactory()
.createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
- messageFlux
+ return messageFlux
.log()
.doFirst(() ->
{
{
throw new RuntimeException(e);
}
- })
- .subscribe();
+ });
}
catch (IOException e)
{
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
+import java.util.UUID;
+
@AllArgsConstructor
@NoArgsConstructor
private String id;
private String name;
+ public ChatRoomInfo toChatRoomInfo()
+ {
+ return new ChatRoomInfo(
+ UUID.fromString(id),
+ name);
+ }
+
public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
{
return new ChatRoomTo(
@Override
- public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+ public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
{
- chatRoomInfoFlux
+ return chatRoomInfoFlux
.map(ChatRoomTo::from)
.map(chatroomTo -> chatRoomRepository.save(chatroomTo))
- .subscribe();
+ .map(ChatRoomTo::toChatRoomInfo);
}
@Override
}
@Override
- public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+ public Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
{
- messageFlux
+ return messageFlux
.map(message -> MessageTo.from(chatRoomId, message))
.map(messageTo -> messageRepository.save(messageTo))
- .subscribe();
+ .map(MessageTo::toMessage);
}
@Override
@Slf4j
public class NoStorageStorageStrategy implements StorageStrategy
{
- @Override
- public void write(ChatHomeService chatHomeService)
+ public Flux<ChatRoomInfo> write(ChatHomeService chatHomeService)
{
- log.info("Storage is disabled: Not storing {}", chatHomeService);
+ return Flux
+ .<ChatRoomInfo>empty()
+ .doOnComplete(() -> log.info("Storage is disabled: Not storing {}", chatHomeService));
+
}
- @Override
- public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux) {}
+ public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+ {
+ return chatRoomInfoFlux;
+ }
@Override
public Flux<ChatRoomInfo> readChatRoomInfo()
}
@Override
- public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux) {}
+ public Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+ {
+ return messageFlux;
+ }
@Override
public Flux<Message> readChatRoomData(UUID chatRoomId)
protected void stop()
{
- getStorageStrategy().write(chathome);
+ getStorageStrategy()
+ .write(chathome)
+ .subscribe();
}
@Test