WIP:haproxy
authorKai Moritz <kai@juplo.de>
Sun, 17 Sep 2023 09:57:38 +0000 (11:57 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 17 Sep 2023 09:57:38 +0000 (11:57 +0200)
src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java

index 7835b18..5991709 100644 (file)
@@ -18,7 +18,7 @@ public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrate
 
 
   @Override
-  public Mono<String[]> publishOwnership(int shard)
+  public Mono<String> publishOwnership(int shard)
   {
     return Mono.error(new RuntimeException("TODO"));
   }
index 114619e..1a2f1b0 100644 (file)
@@ -144,11 +144,10 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
       infoChannel.sendShardAssignedEvent(partition);
       shardingPublisherStrategy
           .publishOwnership(partition)
-          .map(instanceId -> "Instance "
-              + instanceId
-              + " was published as owner of shard "
-              + partition)
-          .doOnError(e -> log.error("Could not publish ownershit for shard " + partition + ": " + e))
+          .doOnNext(instanceId -> log.info(
+              "Instance {} was published as owner of shard {}",
+              instanceId,
+              partition))
           .subscribe();
     });