TMP:test:FIX
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / InfoChannel.java
index e6c18c7..95f7fb0 100644 (file)
@@ -6,48 +6,54 @@ import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatR
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
 import lombok.Getter;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.net.URI;
-import java.time.*;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.stream.IntStream;
 
 
+@ToString(of = { "topic", "instanceUri" })
 @Slf4j
-public class InfoChannel implements Runnable
+public class InfoChannel implements Channel
 {
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
   private final Consumer<String, AbstractMessageTo> consumer;
+  private final Duration pollingInterval;
   private final int numShards;
   private final String[] shardOwners;
   private final long[] currentOffset;
   private final long[] nextOffset;
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
   private final String instanceUri;
+  private final ChannelMediator channelMediator;
 
   private boolean running;
   @Getter
-  private volatile boolean loadInProgress = true;
+  private volatile ChannelState channelState = ChannelState.STARTING;
 
 
   public InfoChannel(
     String topic,
     Producer<String, AbstractMessageTo> producer,
     Consumer<String, AbstractMessageTo> infoChannelConsumer,
-    URI instanceUri)
+    Duration pollingInterval,
+    int numShards,
+    URI instanceUri,
+    ChannelMediator channelMediator)
   {
     log.debug(
         "Creating InfoChannel for topic {}",
@@ -57,9 +63,9 @@ public class InfoChannel implements Runnable
     this.producer = producer;
     this.chatRoomInfo = new HashMap<>();
 
-    this.numShards = consumer
-        .partitionsFor(topic)
-        .size();
+    this.pollingInterval = pollingInterval;
+
+    this.numShards = numShards;
     this.shardOwners = new String[numShards];
     this.currentOffset = new long[numShards];
     this.nextOffset = new long[numShards];
@@ -68,6 +74,8 @@ public class InfoChannel implements Runnable
         .forEach(partition -> this.nextOffset[partition] = -1l);
 
     this.instanceUri = instanceUri.toASCIIString();
+
+    this.channelMediator = channelMediator;
   }
 
 
@@ -185,13 +193,13 @@ public class InfoChannel implements Runnable
     IntStream
         .range(0, numShards)
         .forEach(partition -> this.nextOffset[partition] = 0l);
-    loadInProgress = true;
+    channelState = ChannelState.LOAD_IN_PROGRESS;
 
     while (running)
     {
       try
       {
-        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
+        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
         log.debug("Fetched {} messages", records.count());
         for (ConsumerRecord<String, AbstractMessageTo> record : records)
         {
@@ -202,6 +210,7 @@ public class InfoChannel implements Runnable
       catch (WakeupException e)
       {
         log.info("Received WakeupException, exiting!");
+        channelState = ChannelState.SHUTTING_DOWN;
         running = false;
       }
     }
@@ -212,10 +221,16 @@ public class InfoChannel implements Runnable
   private void updateNextOffset(int partition, long nextOffset)
   {
     this.nextOffset[partition] = nextOffset;
-    if (loadInProgress) {
-      loadInProgress = IntStream
+    if (channelState == ChannelState.LOAD_IN_PROGRESS)
+    {
+      boolean loadInProgress = IntStream
           .range(0, numShards)
-          .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
+          .anyMatch(shard -> this.nextOffset[shard] < currentOffset[shard]);
+      if (!loadInProgress)
+      {
+        log.info("Loading of info completed! Resuming normal operations...");
+        channelState = ChannelState.READY;
+      }
     }
   }
 
@@ -278,6 +293,7 @@ public class InfoChannel implements Runnable
           chatRoomId);
 
       this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
+      this.channelMediator.chatRoomCreated(chatRoomInfo);
     }
   }
 
@@ -288,6 +304,12 @@ public class InfoChannel implements Runnable
 
   Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
   {
+    ChannelState capturedState = channelState;
+    if (capturedState != ChannelState.READY)
+    {
+      return Mono.error(new ChannelNotReadyException(capturedState));
+    }
+
     return Mono.fromSupplier(() -> chatRoomInfo.get(id));
   }