});
}
- Mono<RecordMetadata> sendShardAssignedEvent(int shard)
+ void sendShardAssignedEvent(int shard)
{
EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
- return Mono.create(sink ->
- {
- ProducerRecord<String, AbstractMessageTo> record =
- new ProducerRecord<>(
- topic,
- Integer.toString(shard),
- to);
+ ProducerRecord<String, AbstractMessageTo> record =
+ new ProducerRecord<>(
+ topic,
+ Integer.toString(shard),
+ to);
- producer.send(record, ((metadata, exception) ->
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
{
- if (metadata != null)
- {
- log.info("Successfully sent shard assigned event for shard: {}", shard);
- sink.success(metadata);
- }
- else
- {
- // On send-failure
- log.error(
- "Could not send shard assigned event for shard {}: {}",
- shard,
- exception);
- sink.error(exception);
- }
- }));
- });
+ log.info("Successfully sent shard assigned event for shard: {}", shard);
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send shard assigned event for shard {}: {}",
+ shard,
+ exception);
+ }
+ }));
}
- Mono<RecordMetadata> sendShardRevokedEvent(int shard)
+ void sendShardRevokedEvent(int shard)
{
EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
- return Mono.create(sink ->
- {
- ProducerRecord<String, AbstractMessageTo> record =
- new ProducerRecord<>(
- topic,
- Integer.toString(shard),
- to);
+ ProducerRecord<String, AbstractMessageTo> record =
+ new ProducerRecord<>(
+ topic,
+ Integer.toString(shard),
+ to);
- producer.send(record, ((metadata, exception) ->
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
{
- if (metadata != null)
- {
- log.info("Successfully sent shard revoked event for shard: {}", shard);
- sink.success(metadata);
- }
- else
- {
- // On send-failure
- log.error(
- "Could not send shard revoked event for shard {}: {}",
- shard,
- exception);
- sink.error(exception);
- }
- }));
- });
+ log.info("Successfully sent shard revoked event for shard: {}", shard);
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send shard revoked event for shard {}: {}",
+ shard,
+ exception);
+ }
+ }));
}