From 409bf8614f1fb53cfb199b2377b1aae9fa3dd433 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 25 Jan 2023 22:58:54 +0100 Subject: [PATCH] WIP --- .../backend/persistence/kafka/KafkaChatRoomService.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index 1175d55f..79f2e63a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -7,7 +7,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -16,7 +15,6 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.LinkedHashMap; import java.util.UUID; -import java.util.concurrent.Future; @Slf4j @@ -53,6 +51,7 @@ public class KafkaChatRoomService implements ChatRoomService { if (metadata != null) { + // On successful send Message message = messages.get(key); if (message != null) { @@ -80,6 +79,11 @@ public class KafkaChatRoomService implements ChatRoomService sink.success(); } + else + { + // On send-failure + sink.error(exception); + } })); }); } -- 2.20.1