WIP: shard assigned/revoked events
authorKai Moritz <kai@juplo.de>
Sun, 28 Jan 2024 17:58:53 +0000 (18:58 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 28 Jan 2024 17:58:53 +0000 (18:58 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java

index 2b98629..f28a1e7 100644 (file)
@@ -111,68 +111,58 @@ public class InfoChannel implements Runnable
     });
   }
 
-  Mono<RecordMetadata> sendShardAssignedEvent(int shard)
+  void sendShardAssignedEvent(int shard)
   {
     EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
 
-    return Mono.create(sink ->
-    {
-      ProducerRecord<String, AbstractMessageTo> record =
-          new ProducerRecord<>(
-              topic,
-              Integer.toString(shard),
-              to);
+    ProducerRecord<String, AbstractMessageTo> record =
+        new ProducerRecord<>(
+            topic,
+            Integer.toString(shard),
+            to);
 
-      producer.send(record, ((metadata, exception) ->
+    producer.send(record, ((metadata, exception) ->
+    {
+      if (metadata != null)
       {
-        if (metadata != null)
-        {
-          log.info("Successfully sent shard assigned event for shard: {}", shard);
-          sink.success(metadata);
-        }
-        else
-        {
-          // On send-failure
-          log.error(
-              "Could not send shard assigned event for shard {}: {}",
-              shard,
-              exception);
-          sink.error(exception);
-        }
-      }));
-    });
+        log.info("Successfully sent shard assigned event for shard: {}", shard);
+      }
+      else
+      {
+        // On send-failure
+        log.error(
+            "Could not send shard assigned event for shard {}: {}",
+            shard,
+            exception);
+      }
+    }));
   }
 
-  Mono<RecordMetadata> sendShardRevokedEvent(int shard)
+  void sendShardRevokedEvent(int shard)
   {
     EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
 
-    return Mono.create(sink ->
-    {
-      ProducerRecord<String, AbstractMessageTo> record =
-          new ProducerRecord<>(
-              topic,
-              Integer.toString(shard),
-              to);
+    ProducerRecord<String, AbstractMessageTo> record =
+        new ProducerRecord<>(
+            topic,
+            Integer.toString(shard),
+            to);
 
-      producer.send(record, ((metadata, exception) ->
+    producer.send(record, ((metadata, exception) ->
+    {
+      if (metadata != null)
       {
-        if (metadata != null)
-        {
-          log.info("Successfully sent shard revoked event for shard: {}", shard);
-          sink.success(metadata);
-        }
-        else
-        {
-          // On send-failure
-          log.error(
-              "Could not send shard revoked event for shard {}: {}",
-              shard,
-              exception);
-          sink.error(exception);
-        }
-      }));
-    });
+        log.info("Successfully sent shard revoked event for shard: {}", shard);
+      }
+      else
+      {
+        // On send-failure
+        log.error(
+            "Could not send shard revoked event for shard {}: {}",
+            shard,
+            exception);
+      }
+    }));
   }