- if (metadata != null)
- {
- log.info("Successfully sent shard assigned event for shard: {}", shard);
- sink.success(metadata);
- }
- else
- {
- // On send-failure
- log.error(
- "Could not send shard assigned event for shard {}: {}",
- shard,
- exception);
- sink.error(exception);
- }
- }));
- });
+ log.info("Successfully sent shard assigned event for shard: {}", shard);
+ sink.emitValue(metadata, Sinks.EmitFailureHandler.FAIL_FAST);
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send shard assigned event for shard {}: {}",
+ shard,
+ exception);
+ sink.emitError(exception, Sinks.EmitFailureHandler.FAIL_FAST);
+ }
+ }));
+
+ return sink.asMono();