fix: Errors during shard-publishing should not kill the instance
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / InfoChannel.java
index e6c18c7..a6351d0 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
@@ -31,6 +32,7 @@ public class InfoChannel implements Runnable
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
   private final Consumer<String, AbstractMessageTo> consumer;
+  private final Duration pollingInterval;
   private final int numShards;
   private final String[] shardOwners;
   private final long[] currentOffset;
@@ -47,6 +49,8 @@ public class InfoChannel implements Runnable
     String topic,
     Producer<String, AbstractMessageTo> producer,
     Consumer<String, AbstractMessageTo> infoChannelConsumer,
+    Duration pollingInterval,
+    int numShards,
     URI instanceUri)
   {
     log.debug(
@@ -57,9 +61,9 @@ public class InfoChannel implements Runnable
     this.producer = producer;
     this.chatRoomInfo = new HashMap<>();
 
-    this.numShards = consumer
-        .partitionsFor(topic)
-        .size();
+    this.pollingInterval = pollingInterval;
+
+    this.numShards = numShards;
     this.shardOwners = new String[numShards];
     this.currentOffset = new long[numShards];
     this.nextOffset = new long[numShards];
@@ -191,7 +195,7 @@ public class InfoChannel implements Runnable
     {
       try
       {
-        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
+        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
         log.debug("Fetched {} messages", records.count());
         for (ConsumerRecord<String, AbstractMessageTo> record : records)
         {
@@ -288,6 +292,11 @@ public class InfoChannel implements Runnable
 
   Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
   {
+    if (loadInProgress)
+    {
+      return Mono.error(new LoadInProgressException());
+    }
+
     return Mono.fromSupplier(() -> chatRoomInfo.get(id));
   }