WIP
authorKai Moritz <kai@juplo.de>
Wed, 25 Jan 2023 21:58:54 +0000 (22:58 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 24 Feb 2023 11:14:59 +0000 (12:14 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java

index 1175d55..79f2e63 100644 (file)
@@ -7,7 +7,6 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -16,7 +15,6 @@ import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.LinkedHashMap;
 import java.util.UUID;
-import java.util.concurrent.Future;
 
 
 @Slf4j
@@ -53,6 +51,7 @@ public class KafkaChatRoomService implements ChatRoomService
       {
         if (metadata != null)
         {
+          // On successful send
           Message message = messages.get(key);
           if (message != null)
           {
@@ -80,6 +79,11 @@ public class KafkaChatRoomService implements ChatRoomService
 
           sink.success();
         }
+        else
+        {
+          // On send-failure
+          sink.error(exception);
+        }
       }));
     });
   }