WIP: Mono durch Sink.unsafe().one() ersetzt
authorKai Moritz <kai@juplo.de>
Sat, 16 Sep 2023 21:05:52 +0000 (23:05 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 27 Jan 2024 15:04:43 +0000 (16:04 +0100)
--
TODO: Cold vs. Hot Sinks noch man neu recherchieren...

src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java

index 4711dbd..f08a8d8 100644 (file)
@@ -15,6 +15,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
 
 import java.net.URI;
 import java.time.*;
@@ -115,64 +116,66 @@ public class InfoChannel implements Runnable
   {
     EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
 
-    return Mono.create(sink ->
-    {
-      ProducerRecord<String, AbstractMessageTo> record =
-          new ProducerRecord<>(
-              topic,
-              Integer.toString(shard),
-              to);
+    Sinks.One sink = Sinks.unsafe().one();
 
-      producer.send(record, ((metadata, exception) ->
+    ProducerRecord<String, AbstractMessageTo> record =
+        new ProducerRecord<>(
+            topic,
+            Integer.toString(shard),
+            to);
+
+    producer.send(record, ((metadata, exception) ->
+    {
+      if (metadata != null)
       {
-        if (metadata != null)
-        {
-          log.info("Successfully sent shard assigned event for shard: {}", shard);
-          sink.success(metadata);
-        }
-        else
-        {
-          // On send-failure
-          log.error(
-              "Could not send shard assigned event for shard {}: {}",
-              shard,
-              exception);
-          sink.error(exception);
-        }
-      }));
-    });
+        log.info("Successfully sent shard assigned event for shard: {}", shard);
+        sink.emitValue(metadata, Sinks.EmitFailureHandler.FAIL_FAST);
+      }
+      else
+      {
+        // On send-failure
+        log.error(
+            "Could not send shard assigned event for shard {}: {}",
+            shard,
+            exception);
+        sink.emitError(exception, Sinks.EmitFailureHandler.FAIL_FAST);
+      }
+    }));
+
+    return sink.asMono();
   }
 
   Mono<RecordMetadata> sendShardRevokedEvent(int shard)
   {
     EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
 
-    return Mono.create(sink ->
-    {
-      ProducerRecord<String, AbstractMessageTo> record =
-          new ProducerRecord<>(
-              topic,
-              Integer.toString(shard),
-              to);
+    Sinks.One sink = Sinks.unsafe().one();
 
-      producer.send(record, ((metadata, exception) ->
+    ProducerRecord<String, AbstractMessageTo> record =
+        new ProducerRecord<>(
+            topic,
+            Integer.toString(shard),
+            to);
+
+    producer.send(record, ((metadata, exception) ->
+    {
+      if (metadata != null)
       {
-        if (metadata != null)
-        {
-          log.info("Successfully sent shard revoked event for shard: {}", shard);
-          sink.success(metadata);
-        }
-        else
-        {
-          // On send-failure
-          log.error(
-              "Could not send shard revoked event for shard {}: {}",
-              shard,
-              exception);
-          sink.error(exception);
-        }
-      }));
-    });
+        log.info("Successfully sent shard revoked event for shard: {}", shard);
+        sink.emitValue(metadata, Sinks.EmitFailureHandler.FAIL_FAST);
+      }
+      else
+      {
+        // On send-failure
+        log.error(
+            "Could not send shard revoked event for shard {}: {}",
+            shard,
+            exception);
+        sink.emitError(exception, Sinks.EmitFailureHandler.FAIL_FAST);
+      }
+    }));
+
+    return sink.asMono();
   }