WIP:haproxy
authorKai Moritz <kai@juplo.de>
Sun, 17 Sep 2023 09:52:00 +0000 (11:52 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 17 Sep 2023 09:52:00 +0000 (11:52 +0200)
src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java

index 59fd80c..9a1c725 100644 (file)
@@ -5,5 +5,5 @@ import reactor.core.publisher.Mono;
 
 public interface ShardingPublisherStrategy
 {
-  Mono<String[]> publishOwnership(int shard);
+  Mono<String> publishOwnership(int shard);
 }
index e0d42d4..114619e 100644 (file)
@@ -142,7 +142,14 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
       consumer.seek(topicPartition, nextOffset[partition]);
       infoChannel.sendShardAssignedEvent(partition);
-      shardingPublisherStrategy.publishOwnership(partition);
+      shardingPublisherStrategy
+          .publishOwnership(partition)
+          .map(instanceId -> "Instance "
+              + instanceId
+              + " was published as owner of shard "
+              + partition)
+          .doOnError(e -> log.error("Could not publish ownershit for shard " + partition + ": " + e))
+          .subscribe();
     });
 
     consumer.resume(partitions);