WIP: shard assigned/revoked events
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / InfoChannel.java
index 26e8696..4711dbd 100644 (file)
@@ -3,16 +3,20 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.net.URI;
 import java.time.*;
 import java.util.HashMap;
 import java.util.Map;
@@ -27,9 +31,11 @@ public class InfoChannel implements Runnable
   private final Producer<String, AbstractMessageTo> producer;
   private final Consumer<String, AbstractMessageTo> consumer;
   private final int numShards;
+  private final String[] shardOwners;
   private final long[] currentOffset;
   private final long[] nextOffset;
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
+  private final String instanceUri;
 
   private boolean running;
 
@@ -37,7 +43,8 @@ public class InfoChannel implements Runnable
   public InfoChannel(
     String topic,
     Producer<String, AbstractMessageTo> producer,
-    Consumer<String, AbstractMessageTo> infoChannelConsumer)
+    Consumer<String, AbstractMessageTo> infoChannelConsumer,
+    URI instanceUri)
   {
     log.debug(
         "Creating InfoChannel for topic {}",
@@ -50,11 +57,14 @@ public class InfoChannel implements Runnable
     this.numShards = consumer
         .partitionsFor(topic)
         .size();
+    this.shardOwners = new String[numShards];
     this.currentOffset = new long[numShards];
     this.nextOffset = new long[numShards];
     IntStream
         .range(0, numShards)
         .forEach(partition -> this.nextOffset[partition] = -1l);
+
+    this.instanceUri = instanceUri.toASCIIString();
   }
 
 
@@ -83,7 +93,7 @@ public class InfoChannel implements Runnable
       {
         if (metadata != null)
         {
-          log.info("Successfully sent chreate-request for chat room: {}", to);
+          log.info("Successfully sent created event for chat chat-room: {}", to);
           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
           sink.success(chatRoomInfo);
         }
@@ -91,7 +101,7 @@ public class InfoChannel implements Runnable
         {
           // On send-failure
           log.error(
-              "Could not send create-request for chat room (id={}, name={}): {}",
+              "Could not send created event for chat-room (id={}, name={}): {}",
               chatRoomId,
               name,
               exception);
@@ -101,6 +111,70 @@ public class InfoChannel implements Runnable
     });
   }
 
+  Mono<RecordMetadata> sendShardAssignedEvent(int shard)
+  {
+    EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
+
+    return Mono.create(sink ->
+    {
+      ProducerRecord<String, AbstractMessageTo> record =
+          new ProducerRecord<>(
+              topic,
+              Integer.toString(shard),
+              to);
+
+      producer.send(record, ((metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          log.info("Successfully sent shard assigned event for shard: {}", shard);
+          sink.success(metadata);
+        }
+        else
+        {
+          // On send-failure
+          log.error(
+              "Could not send shard assigned event for shard {}: {}",
+              shard,
+              exception);
+          sink.error(exception);
+        }
+      }));
+    });
+  }
+
+  Mono<RecordMetadata> sendShardRevokedEvent(int shard)
+  {
+    EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
+
+    return Mono.create(sink ->
+    {
+      ProducerRecord<String, AbstractMessageTo> record =
+          new ProducerRecord<>(
+              topic,
+              Integer.toString(shard),
+              to);
+
+      producer.send(record, ((metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          log.info("Successfully sent shard revoked event for shard: {}", shard);
+          sink.success(metadata);
+        }
+        else
+        {
+          // On send-failure
+          log.error(
+              "Could not send shard revoked event for shard {}: {}",
+              shard,
+              exception);
+          sink.error(exception);
+        }
+      }));
+    });
+  }
+
 
   @Override
   public void run()
@@ -146,6 +220,26 @@ public class InfoChannel implements Runnable
           createChatRoom(eventChatRoomCreated.toChatRoomInfo());
           break;
 
+        case EVENT_SHARD_ASSIGNED:
+          EventShardAssigned eventShardAssigned =
+              (EventShardAssigned) record.value();
+          log.info(
+              "Shard {} was assigned to {}",
+              eventShardAssigned.getShard(),
+              eventShardAssigned.getUri());
+          shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
+          break;
+
+        case EVENT_SHARD_REVOKED:
+          EventShardRevoked eventShardRevoked =
+              (EventShardRevoked) record.value();
+          log.info(
+              "Shard {} was revoked from {}",
+              eventShardRevoked.getShard(),
+              eventShardRevoked.getUri());
+          shardOwners[eventShardRevoked.getShard()] = null;
+          break;
+
         default:
           log.debug(
               "Ignoring message for key={} with offset={}: {}",
@@ -190,4 +284,9 @@ public class InfoChannel implements Runnable
   {
     return Mono.fromSupplier(() -> chatRoomInfo.get(id));
   }
+
+  Mono<String[]> getShardOwners()
+  {
+    return Mono.just(shardOwners);
+  }
 }