TMP:test -- FIX: `ChatRoomData` active/inactive
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / InfoChannel.java
index ad03f0d..95f7fb0 100644 (file)
@@ -3,6 +3,10 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
+import lombok.Getter;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -13,31 +17,43 @@ import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import java.time.*;
+import java.net.URI;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.stream.IntStream;
 
 
+@ToString(of = { "topic", "instanceUri" })
 @Slf4j
-public class InfoChannel implements Runnable
+public class InfoChannel implements Channel
 {
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
   private final Consumer<String, AbstractMessageTo> consumer;
+  private final Duration pollingInterval;
   private final int numShards;
+  private final String[] shardOwners;
   private final long[] currentOffset;
   private final long[] nextOffset;
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
+  private final String instanceUri;
+  private final ChannelMediator channelMediator;
 
   private boolean running;
+  @Getter
+  private volatile ChannelState channelState = ChannelState.STARTING;
 
 
   public InfoChannel(
     String topic,
     Producer<String, AbstractMessageTo> producer,
-    Consumer<String, AbstractMessageTo> infoChannelConsumer)
+    Consumer<String, AbstractMessageTo> infoChannelConsumer,
+    Duration pollingInterval,
+    int numShards,
+    URI instanceUri,
+    ChannelMediator channelMediator)
   {
     log.debug(
         "Creating InfoChannel for topic {}",
@@ -47,24 +63,22 @@ public class InfoChannel implements Runnable
     this.producer = producer;
     this.chatRoomInfo = new HashMap<>();
 
-    this.numShards = consumer
-        .partitionsFor(topic)
-        .size();
+    this.pollingInterval = pollingInterval;
+
+    this.numShards = numShards;
+    this.shardOwners = new String[numShards];
     this.currentOffset = new long[numShards];
     this.nextOffset = new long[numShards];
     IntStream
         .range(0, numShards)
         .forEach(partition -> this.nextOffset[partition] = -1l);
-  }
 
+    this.instanceUri = instanceUri.toASCIIString();
 
-  boolean loadInProgress()
-  {
-    return IntStream
-        .range(0, numShards)
-        .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]);
+    this.channelMediator = channelMediator;
   }
 
+
   Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
       UUID chatRoomId,
       String name,
@@ -81,17 +95,17 @@ public class InfoChannel implements Runnable
 
       producer.send(record, ((metadata, exception) ->
       {
-        if (metadata != null)
+        if (exception == null)
         {
-          log.info("Successfully sent chreate-request for chat room: {}", to);
-          ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
+          log.info("Successfully sent created event for chat chat-room: {}", to);
+          ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
           sink.success(chatRoomInfo);
         }
         else
         {
           // On send-failure
           log.error(
-              "Could not send create-request for chat room (id={}, name={}): {}",
+              "Could not send created event for chat-room (id={}, name={}): {}",
               chatRoomId,
               name,
               exception);
@@ -101,6 +115,70 @@ public class InfoChannel implements Runnable
     });
   }
 
+  void sendShardAssignedEvent(int shard)
+  {
+    EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
+
+    ProducerRecord<String, AbstractMessageTo> record =
+        new ProducerRecord<>(
+            topic,
+            Integer.toString(shard),
+            to);
+
+    producer.send(record, ((metadata, exception) ->
+    {
+      if (metadata != null)
+      {
+        log.info("Successfully sent shard assigned event for shard: {}", shard);
+      }
+      else
+      {
+        // On send-failure
+        log.error(
+            "Could not send shard assigned event for shard {}: {}",
+            shard,
+            exception);
+        // TODO:
+        // Verhalten im Fehlerfall durchdenken!
+        // Z.B.: unsubscribe() und darauf folgendes (re-)subscribe() des
+        // Consumers veranlassen, so dass die nicht öffentlich Bekannte
+        // Zuständigkeit abgegeben und neu zugeordnet wird?
+        // Falls der Weg gegangen wird: Achtung wegen Sticke Partitions!
+      }
+    }));
+  }
+
+  void sendShardRevokedEvent(int shard)
+  {
+    EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
+
+    ProducerRecord<String, AbstractMessageTo> record =
+        new ProducerRecord<>(
+            topic,
+            Integer.toString(shard),
+            to);
+
+    producer.send(record, ((metadata, exception) ->
+    {
+      if (metadata != null)
+      {
+        log.info("Successfully sent shard revoked event for shard: {}", shard);
+      }
+      else
+      {
+        // On send-failure
+        log.error(
+            "Could not send shard revoked event for shard {}: {}",
+            shard,
+            exception);
+        // TODO:
+        // Verhalten im Fehlerfall durchdenken!
+        // Ggf. einfach egal, da die neue zuständige Instanz den
+        // nicht gelöschten Eintrag eh überschreibt?
+      }
+    }));
+  }
+
 
   @Override
   public void run()
@@ -115,18 +193,24 @@ public class InfoChannel implements Runnable
     IntStream
         .range(0, numShards)
         .forEach(partition -> this.nextOffset[partition] = 0l);
+    channelState = ChannelState.LOAD_IN_PROGRESS;
 
     while (running)
     {
       try
       {
-        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
+        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
         log.debug("Fetched {} messages", records.count());
-        handleMessages(records);
+        for (ConsumerRecord<String, AbstractMessageTo> record : records)
+        {
+          handleMessage(record);
+          updateNextOffset(record.partition(), record.offset() + 1);
+        }
       }
       catch (WakeupException e)
       {
         log.info("Received WakeupException, exiting!");
+        channelState = ChannelState.SHUTTING_DOWN;
         running = false;
       }
     }
@@ -134,27 +218,58 @@ public class InfoChannel implements Runnable
     log.info("Exiting normally");
   }
 
-  private void handleMessages(ConsumerRecords<String, AbstractMessageTo> records)
+  private void updateNextOffset(int partition, long nextOffset)
   {
-    for (ConsumerRecord<String, AbstractMessageTo> record : records)
+    this.nextOffset[partition] = nextOffset;
+    if (channelState == ChannelState.LOAD_IN_PROGRESS)
     {
-      switch (record.value().getType())
+      boolean loadInProgress = IntStream
+          .range(0, numShards)
+          .anyMatch(shard -> this.nextOffset[shard] < currentOffset[shard]);
+      if (!loadInProgress)
       {
-        case EVENT_CHATROOM_CREATED:
-          EventChatRoomCreated eventChatRoomCreated =
-              (EventChatRoomCreated) record.value();
-          createChatRoom(eventChatRoomCreated.toChatRoomInfo());
-          break;
-
-        default:
-          log.debug(
-              "Ignoring message for key={} with offset={}: {}",
-              record.key(),
-              record.offset(),
-              record.value());
+        log.info("Loading of info completed! Resuming normal operations...");
+        channelState = ChannelState.READY;
       }
+    }
+  }
+
+  private void handleMessage(ConsumerRecord<String, AbstractMessageTo> record)
+  {
+    switch (record.value().getType())
+    {
+      case EVENT_CHATROOM_CREATED:
+        EventChatRoomCreated eventChatRoomCreated =
+            (EventChatRoomCreated) record.value();
+        createChatRoom(eventChatRoomCreated.toChatRoomInfo());
+        break;
+
+      case EVENT_SHARD_ASSIGNED:
+        EventShardAssigned eventShardAssigned =
+            (EventShardAssigned) record.value();
+        log.info(
+            "Shard {} was assigned to {}",
+            eventShardAssigned.getShard(),
+            eventShardAssigned.getUri());
+        shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
+        break;
 
-      nextOffset[record.partition()] = record.offset() + 1;
+      case EVENT_SHARD_REVOKED:
+        EventShardRevoked eventShardRevoked =
+            (EventShardRevoked) record.value();
+        log.info(
+            "Shard {} was revoked from {}",
+            eventShardRevoked.getShard(),
+            eventShardRevoked.getUri());
+        shardOwners[eventShardRevoked.getShard()] = null;
+        break;
+
+      default:
+        log.debug(
+            "Ignoring message for key={} with offset={}: {}",
+            record.key(),
+            record.offset(),
+            record.value());
     }
   }
 
@@ -178,6 +293,7 @@ public class InfoChannel implements Runnable
           chatRoomId);
 
       this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
+      this.channelMediator.chatRoomCreated(chatRoomInfo);
     }
   }
 
@@ -188,6 +304,17 @@ public class InfoChannel implements Runnable
 
   Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
   {
+    ChannelState capturedState = channelState;
+    if (capturedState != ChannelState.READY)
+    {
+      return Mono.error(new ChannelNotReadyException(capturedState));
+    }
+
     return Mono.fromSupplier(() -> chatRoomInfo.get(id));
   }
+
+  Mono<String[]> getShardOwners()
+  {
+    return Mono.just(shardOwners);
+  }
 }