refactor: Refined channel-states, introduced `ChannelState` -- MOVE
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / InfoChannel.java
index f5d5253..f3150ce 100644 (file)
@@ -32,12 +32,14 @@ public class InfoChannel implements Runnable
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
   private final Consumer<String, AbstractMessageTo> consumer;
+  private final Duration pollingInterval;
   private final int numShards;
   private final String[] shardOwners;
   private final long[] currentOffset;
   private final long[] nextOffset;
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
   private final String instanceUri;
+  private final ChannelMediator channelMediator;
 
   private boolean running;
   @Getter
@@ -48,8 +50,10 @@ public class InfoChannel implements Runnable
     String topic,
     Producer<String, AbstractMessageTo> producer,
     Consumer<String, AbstractMessageTo> infoChannelConsumer,
+    Duration pollingInterval,
     int numShards,
-    URI instanceUri)
+    URI instanceUri,
+    ChannelMediator channelMediator)
   {
     log.debug(
         "Creating InfoChannel for topic {}",
@@ -59,6 +63,8 @@ public class InfoChannel implements Runnable
     this.producer = producer;
     this.chatRoomInfo = new HashMap<>();
 
+    this.pollingInterval = pollingInterval;
+
     this.numShards = numShards;
     this.shardOwners = new String[numShards];
     this.currentOffset = new long[numShards];
@@ -68,6 +74,8 @@ public class InfoChannel implements Runnable
         .forEach(partition -> this.nextOffset[partition] = -1l);
 
     this.instanceUri = instanceUri.toASCIIString();
+
+    this.channelMediator = channelMediator;
   }
 
 
@@ -191,7 +199,7 @@ public class InfoChannel implements Runnable
     {
       try
       {
-        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
+        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
         log.debug("Fetched {} messages", records.count());
         for (ConsumerRecord<String, AbstractMessageTo> record : records)
         {
@@ -278,6 +286,7 @@ public class InfoChannel implements Runnable
           chatRoomId);
 
       this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
+      this.channelMediator.chatRoomCreated(chatRoomInfo);
     }
   }