Phaser writtenChatRooms = new Phaser(1);
AtomicInteger numErrors = new AtomicInteger();
- writeChatRoomInfo(
+ Flux<ChatRoomInfo> chatRoomInfoFlux =
chatHomeService
.getChatRoomInfo()
.doOnNext(chatRoomInfo -> writtenChatRooms.register())
logFailure(chatRoomId, throwable);
numErrors.incrementAndGet();
writtenChatRooms.arriveAndDeregister();
- })));
+ }));
+
+ writeChatRoomInfo(chatRoomInfoFlux);
writtenChatRooms.arriveAndAwaitAdvance();
if (numErrors.get() > 0)
throw new RuntimeException(e);
}
})
- .doOnTerminate(() ->
+ .doOnNext(chatRoomInfo ->
{
try
{
- generator.writeEndArray();
- generator.close();
+ ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
+ generator.writeObject(chatRoomInfoTo);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
})
- .subscribe(chatRoomInfo ->
+ .doOnTerminate(() ->
{
try
{
- ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
- generator.writeObject(chatRoomInfoTo);
+ generator.writeEndArray();
+ generator.close();
}
catch (IOException e)
{