From 3c68ae6035bf40fddbba5dabf79ea99c69d731ea Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 11 Mar 2024 18:15:32 +0100 Subject: [PATCH] test: HandoverIT-POC - Refactored `TestListener` --- .../kafka/chat/backend/TestListener.java | 40 +++++++++++-------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java index 95a8ff8f..e825da38 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java +++ b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java @@ -28,28 +28,34 @@ public class TestListener .fromArray(chatRooms) .flatMap(chatRoom -> { - log.info("Requesting messages for chat-room {}", chatRoom); List list = new LinkedList<>(); receivedMessages.put(chatRoom.getId(), list); - return receiveMessages(chatRoom) - .flatMap(sse -> - { - try - { - return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class)); - } - catch (Exception e) - { - return Mono.error(e); - } - }) - .doOnNext(message -> list.add(message)) - .doOnComplete(() -> log.info("Listening to {} was completed!", chatRoom)) - .doOnError(throwalbe -> log.error("Listening to {} failed!", chatRoom, throwalbe)); + return receiveMessages(chatRoom); }); } - Flux> receiveMessages(ChatRoomInfoTo chatRoom) + Flux receiveMessages(ChatRoomInfoTo chatRoom) + { + log.info("Requesting messages for chat-room {}", chatRoom); + List list = receivedMessages.get(chatRoom.getId()); + return receiveServerSentEvents(chatRoom) + .flatMap(sse -> + { + try + { + return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class)); + } + catch (Exception e) + { + return Mono.error(e); + } + }) + .doOnNext(message -> list.add(message)) + .doOnComplete(() -> log.info("Listening to {} was completed!", chatRoom)) + .doOnError(throwalbe -> log.error("Listening to {} failed!", chatRoom, throwalbe)); + } + + Flux> receiveServerSentEvents(ChatRoomInfoTo chatRoom) { return webClient .get() -- 2.20.1