+ void sendShardAssignedEvent(int shard)
+ {
+ EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
+
+ ProducerRecord<String, AbstractMessageTo> record =
+ new ProducerRecord<>(
+ topic,
+ Integer.toString(shard),
+ to);
+
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ log.info("Successfully sent shard assigned event for shard: {}", shard);
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send shard assigned event for shard {}: {}",
+ shard,
+ exception);
+ // TODO:
+ // Verhalten im Fehlerfall durchdenken!
+ // Z.B.: unsubscribe() und darauf folgendes (re-)subscribe() des
+ // Consumers veranlassen, so dass die nicht öffentlich Bekannte
+ // Zuständigkeit abgegeben und neu zugeordnet wird?
+ // Falls der Weg gegangen wird: Achtung wegen Sticke Partitions!
+ }
+ }));
+ }
+
+ void sendShardRevokedEvent(int shard)
+ {
+ EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
+
+ ProducerRecord<String, AbstractMessageTo> record =
+ new ProducerRecord<>(
+ topic,
+ Integer.toString(shard),
+ to);
+
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ log.info("Successfully sent shard revoked event for shard: {}", shard);
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send shard revoked event for shard {}: {}",
+ shard,
+ exception);
+ // TODO:
+ // Verhalten im Fehlerfall durchdenken!
+ // Ggf. einfach egal, da die neue zuständige Instanz den
+ // nicht gelöschten Eintrag eh überschreibt?
+ }
+ }));
+ }
+