import reactor.core.publisher.Flux;
import java.util.UUID;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
default void write(ChatHomeService chatHomeService)
{
+ Phaser writtenChatRooms = new Phaser(1);
+ AtomicInteger numErrors = new AtomicInteger();
+
writeChatRoomInfo(
chatHomeService
.getChatRoomInfo()
- .doOnNext(chatRoomInfo ->
- writeChatRoomData(
- chatRoomInfo.getId(),
- chatHomeService
- .getChatRoomData(chatRoomInfo.getId())
- .flatMapMany(chatRoomData -> chatRoomData.getMessages()))));
+ .doOnNext(chatRoomInfo -> writtenChatRooms.register())
+ .doOnNext(chatRoomInfo -> writeChatRoomData(
+ chatRoomInfo.getId(),
+ chatHomeService
+ .getChatRoomData(chatRoomInfo.getId())
+ .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
+ (chatRoomId) ->
+ {
+ logSuccess(chatRoomId);
+ writtenChatRooms.arriveAndDeregister();
+ },
+ (chatRoomId, throwable) ->
+ {
+ logFailure(chatRoomId, throwable);
+ numErrors.incrementAndGet();
+ writtenChatRooms.arriveAndDeregister();
+ })));
+
+ writtenChatRooms.arriveAndAwaitAdvance();
+ if (numErrors.get() > 0)
+ {
+ throw new RuntimeException("Could not write all chat-rooms for " + chatHomeService);
+ }
+
+ log.info("All chat-rooms were written successfully for {}", chatHomeService);
}
default void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
writeChatRoomData(
chatRoomId,
messageFlux,
- (id) -> log.info("Successfully stored chat-room {}", id),
- (id, throwable) -> log.error("Could not store chat-room {}: {}", id, throwable));
+ (id) -> logSuccess(id),
+ (id, throwable) -> logFailure(id, throwable));
}
void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
interface SuccessCallback extends Consumer<UUID> {}
interface FailureCallback extends BiConsumer<UUID, Throwable> {}
+
+ default void logSuccess(UUID chatRoomId)
+ {
+ log.info("Successfully stored chat-room {}", chatRoomId);
+ }
+
+ default void logFailure(UUID chatRoomId, Throwable throwable)
+ {
+ log.error("Could not store chat-room {}: {}", chatRoomId, throwable);
+ }
}
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
.expectStatus().isOk()
.expectBody().jsonPath("$.status").isEqualTo("UP");
});
+
}
+ @AfterAll
+ static void waitForCompletionOfStorage(
+ @Autowired StorageStrategy storageStrategy,
+ @Autowired ChatHomeService chatHomeService)
+ {
+ storageStrategy.write(chatHomeService);
+ }
+
+
@Test
@DisplayName("Restored chat-rooms can be listed")
void testRestoredChatRoomsCanBeListed()