WIP:fix:activation
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / InfoChannel.java
index f3150ce..95f7fb0 100644 (file)
@@ -1,33 +1,33 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
 import lombok.Getter;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.net.URI;
-import java.time.*;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.stream.IntStream;
 
 
+@ToString(of = { "topic", "instanceUri" })
 @Slf4j
-public class InfoChannel implements Runnable
+public class InfoChannel implements Channel
 {
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
@@ -43,7 +43,7 @@ public class InfoChannel implements Runnable
 
   private boolean running;
   @Getter
-  private volatile boolean loadInProgress = true;
+  private volatile ChannelState channelState = ChannelState.STARTING;
 
 
   public InfoChannel(
@@ -193,7 +193,7 @@ public class InfoChannel implements Runnable
     IntStream
         .range(0, numShards)
         .forEach(partition -> this.nextOffset[partition] = 0l);
-    loadInProgress = true;
+    channelState = ChannelState.LOAD_IN_PROGRESS;
 
     while (running)
     {
@@ -210,6 +210,7 @@ public class InfoChannel implements Runnable
       catch (WakeupException e)
       {
         log.info("Received WakeupException, exiting!");
+        channelState = ChannelState.SHUTTING_DOWN;
         running = false;
       }
     }
@@ -220,10 +221,16 @@ public class InfoChannel implements Runnable
   private void updateNextOffset(int partition, long nextOffset)
   {
     this.nextOffset[partition] = nextOffset;
-    if (loadInProgress) {
-      loadInProgress = IntStream
+    if (channelState == ChannelState.LOAD_IN_PROGRESS)
+    {
+      boolean loadInProgress = IntStream
           .range(0, numShards)
-          .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
+          .anyMatch(shard -> this.nextOffset[shard] < currentOffset[shard]);
+      if (!loadInProgress)
+      {
+        log.info("Loading of info completed! Resuming normal operations...");
+        channelState = ChannelState.READY;
+      }
     }
   }
 
@@ -297,9 +304,10 @@ public class InfoChannel implements Runnable
 
   Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
   {
-    if (loadInProgress)
+    ChannelState capturedState = channelState;
+    if (capturedState != ChannelState.READY)
     {
-      return Mono.error(new LoadInProgressException());
+      return Mono.error(new ChannelNotReadyException(capturedState));
     }
 
     return Mono.fromSupplier(() -> chatRoomInfo.get(id));