From f7d262315d1d5b985466bb334a84339f49463e38 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 28 Jan 2024 18:58:53 +0100 Subject: [PATCH] WIP: shard assigned/revoked events --- .../implementation/kafka/InfoChannel.java | 90 +++++++++---------- 1 file changed, 40 insertions(+), 50 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 2b986296..f28a1e7d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -111,68 +111,58 @@ public class InfoChannel implements Runnable }); } - Mono sendShardAssignedEvent(int shard) + void sendShardAssignedEvent(int shard) { EventShardAssigned to = EventShardAssigned.of(shard, instanceUri); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - Integer.toString(shard), - to); + ProducerRecord record = + new ProducerRecord<>( + topic, + Integer.toString(shard), + to); - producer.send(record, ((metadata, exception) -> + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) { - if (metadata != null) - { - log.info("Successfully sent shard assigned event for shard: {}", shard); - sink.success(metadata); - } - else - { - // On send-failure - log.error( - "Could not send shard assigned event for shard {}: {}", - shard, - exception); - sink.error(exception); - } - })); - }); + log.info("Successfully sent shard assigned event for shard: {}", shard); + } + else + { + // On send-failure + log.error( + "Could not send shard assigned event for shard {}: {}", + shard, + exception); + } + })); } - Mono sendShardRevokedEvent(int shard) + void sendShardRevokedEvent(int shard) { EventShardRevoked to = EventShardRevoked.of(shard, instanceUri); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - Integer.toString(shard), - to); + ProducerRecord record = + new ProducerRecord<>( + topic, + Integer.toString(shard), + to); - producer.send(record, ((metadata, exception) -> + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) { - if (metadata != null) - { - log.info("Successfully sent shard revoked event for shard: {}", shard); - sink.success(metadata); - } - else - { - // On send-failure - log.error( - "Could not send shard revoked event for shard {}: {}", - shard, - exception); - sink.error(exception); - } - })); - }); + log.info("Successfully sent shard revoked event for shard: {}", shard); + } + else + { + // On send-failure + log.error( + "Could not send shard revoked event for shard {}: {}", + shard, + exception); + } + })); } -- 2.20.1