From: Kai Moritz Date: Wed, 25 Jan 2023 21:58:54 +0000 (+0100) Subject: WIP X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=d160bd5cb1ac93bdc8e5db40f5d69f722627af5b;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index 1175d55f..79f2e63a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -7,7 +7,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -16,7 +15,6 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.LinkedHashMap; import java.util.UUID; -import java.util.concurrent.Future; @Slf4j @@ -53,6 +51,7 @@ public class KafkaChatRoomService implements ChatRoomService { if (metadata != null) { + // On successful send Message message = messages.get(key); if (message != null) { @@ -80,6 +79,11 @@ public class KafkaChatRoomService implements ChatRoomService sink.success(); } + else + { + // On send-failure + sink.error(exception); + } })); }); }