From: Kai Moritz Date: Sat, 16 Sep 2023 21:05:52 +0000 (+0200) Subject: WIP:hot X-Git-Tag: rebase--2024-01-26--18-11~15 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=2239e67fdb857ec01dbcd80f246fe5453eec50a3;p=demos%2Fkafka%2Fchat WIP:hot --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index 94e049bb..ec5c7f5f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -37,7 +37,7 @@ public class ChatBackendProperties public static class KafkaServicesProperties { private URI instanceUri = URI.create("http://localhost:8080"); - private String clientIdPrefix; + private String clientIdPrefix = "DEV"; private String bootstrapServers = ":9092"; private String infoChannelTopic = "info_channel"; private String dataChannelTopic = "data_channel"; diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 4711dbd9..f08a8d85 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -15,6 +15,7 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; import java.net.URI; import java.time.*; @@ -115,64 +116,66 @@ public class InfoChannel implements Runnable { EventShardAssigned to = EventShardAssigned.of(shard, instanceUri); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - Integer.toString(shard), - to); + Sinks.One sink = Sinks.unsafe().one(); - producer.send(record, ((metadata, exception) -> + ProducerRecord record = + new ProducerRecord<>( + topic, + Integer.toString(shard), + to); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) { - if (metadata != null) - { - log.info("Successfully sent shard assigned event for shard: {}", shard); - sink.success(metadata); - } - else - { - // On send-failure - log.error( - "Could not send shard assigned event for shard {}: {}", - shard, - exception); - sink.error(exception); - } - })); - }); + log.info("Successfully sent shard assigned event for shard: {}", shard); + sink.emitValue(metadata, Sinks.EmitFailureHandler.FAIL_FAST); + } + else + { + // On send-failure + log.error( + "Could not send shard assigned event for shard {}: {}", + shard, + exception); + sink.emitError(exception, Sinks.EmitFailureHandler.FAIL_FAST); + } + })); + + return sink.asMono(); } Mono sendShardRevokedEvent(int shard) { EventShardRevoked to = EventShardRevoked.of(shard, instanceUri); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - Integer.toString(shard), - to); + Sinks.One sink = Sinks.unsafe().one(); - producer.send(record, ((metadata, exception) -> + ProducerRecord record = + new ProducerRecord<>( + topic, + Integer.toString(shard), + to); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) { - if (metadata != null) - { - log.info("Successfully sent shard revoked event for shard: {}", shard); - sink.success(metadata); - } - else - { - // On send-failure - log.error( - "Could not send shard revoked event for shard {}: {}", - shard, - exception); - sink.error(exception); - } - })); - }); + log.info("Successfully sent shard revoked event for shard: {}", shard); + sink.emitValue(metadata, Sinks.EmitFailureHandler.FAIL_FAST); + } + else + { + // On send-failure + log.error( + "Could not send shard revoked event for shard {}: {}", + shard, + exception); + sink.emitError(exception, Sinks.EmitFailureHandler.FAIL_FAST); + } + })); + + return sink.asMono(); }