refactor: Refined success/error-handling when publishing shard-ownership
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / DataChannel.java
index 9cafbaa..99beb43 100644 (file)
@@ -38,6 +38,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
   private final long[] nextOffset;
   private final Map<UUID, ChatRoomData>[] chatRoomData;
   private final InfoChannel infoChannel;
+  private final ShardingPublisherStrategy shardingPublisherStrategy;
 
   private boolean running;
   @Getter
@@ -53,7 +54,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     int numShards,
     int bufferSize,
     Clock clock,
-    InfoChannel infoChannel)
+    InfoChannel infoChannel,
+    ShardingPublisherStrategy shardingPublisherStrategy)
   {
     log.debug(
         "{}: Creating DataChannel for topic {} with {} partitions",
@@ -76,6 +78,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
         .range(0, numShards)
         .forEach(shard -> this.chatRoomData[shard] = new HashMap<>());
     this.infoChannel = infoChannel;
+    this.shardingPublisherStrategy = shardingPublisherStrategy;
   }
 
 
@@ -142,6 +145,18 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
       consumer.seek(topicPartition, nextOffset[partition]);
       infoChannel.sendShardAssignedEvent(partition);
+      shardingPublisherStrategy
+          .publishOwnership(partition)
+          .doOnSuccess(instanceId -> log.info(
+              "Successfully published instance {} as owner of shard {}",
+              instanceId,
+              partition))
+          .doOnError(throwable -> log.error(
+              "Could not publish instance {} as owner of shard {}: {}",
+              instanceId,
+              partition,
+              throwable))
+          .block();
     });
 
     consumer.resume(partitions);