feat: Made polling-interval for Kafka configurable
authorKai Moritz <kai@juplo.de>
Sun, 25 Feb 2024 19:50:21 +0000 (20:50 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 25 Feb 2024 19:50:21 +0000 (20:50 +0100)
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java

index df4d1cd..cfc296b 100644 (file)
@@ -6,6 +6,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
 
 import java.net.URI;
 import java.nio.file.Paths;
+import java.time.Duration;
 import java.util.logging.Level;
 
 
@@ -45,6 +46,7 @@ public class ChatBackendProperties
     private String infoChannelTopic = "info_channel";
     private String dataChannelTopic = "data_channel";
     private int numPartitions = 2;
+    private Duration pollingInterval = Duration.ofSeconds(1);
     private String haproxyRuntimeApi = "haproxy:8401";
     private String haproxyMap = "/usr/local/etc/haproxy/sharding.map";
   }
index 397b35a..5063da2 100644 (file)
@@ -28,6 +28,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
   private final Consumer<String, AbstractMessageTo> consumer;
   private final ZoneId zoneId;
   private final int numShards;
+  private final Duration pollingInterval;
   private final int bufferSize;
   private final Clock clock;
   private final boolean[] isShardOwned;
@@ -49,6 +50,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     Consumer<String, AbstractMessageTo> dataChannelConsumer,
     ZoneId zoneId,
     int numShards,
+    Duration pollingInterval,
     int bufferSize,
     Clock clock,
     InfoChannel infoChannel,
@@ -65,6 +67,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     this.producer = producer;
     this.zoneId = zoneId;
     this.numShards = numShards;
+    this.pollingInterval = pollingInterval;
     this.bufferSize = bufferSize;
     this.clock = clock;
     this.isShardOwned = new boolean[numShards];
@@ -190,7 +193,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     {
       try
       {
-        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
+        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
         log.info("Fetched {} messages", records.count());
 
         if (loadInProgress)
index f5d5253..a6351d0 100644 (file)
@@ -32,6 +32,7 @@ public class InfoChannel implements Runnable
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
   private final Consumer<String, AbstractMessageTo> consumer;
+  private final Duration pollingInterval;
   private final int numShards;
   private final String[] shardOwners;
   private final long[] currentOffset;
@@ -48,6 +49,7 @@ public class InfoChannel implements Runnable
     String topic,
     Producer<String, AbstractMessageTo> producer,
     Consumer<String, AbstractMessageTo> infoChannelConsumer,
+    Duration pollingInterval,
     int numShards,
     URI instanceUri)
   {
@@ -59,6 +61,8 @@ public class InfoChannel implements Runnable
     this.producer = producer;
     this.chatRoomInfo = new HashMap<>();
 
+    this.pollingInterval = pollingInterval;
+
     this.numShards = numShards;
     this.shardOwners = new String[numShards];
     this.currentOffset = new long[numShards];
@@ -191,7 +195,7 @@ public class InfoChannel implements Runnable
     {
       try
       {
-        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
+        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
         log.debug("Fetched {} messages", records.count());
         for (ConsumerRecord<String, AbstractMessageTo> record : records)
         {
index 5a41ebc..6b7c156 100644 (file)
@@ -130,6 +130,7 @@ public class KafkaServicesConfiguration
         properties.getKafka().getInfoChannelTopic(),
         producer,
         infoChannelConsumer,
+        properties.getKafka().getPollingInterval(),
         properties.getKafka().getNumPartitions(),
         properties.getKafka().getInstanceUri());
   }
@@ -151,6 +152,7 @@ public class KafkaServicesConfiguration
         dataChannelConsumer,
         zoneId,
         properties.getKafka().getNumPartitions(),
+        properties.getKafka().getPollingInterval(),
         properties.getChatroomBufferSize(),
         clock,
         infoChannel,