From 72c0c1bc16dd53d7013a6d309a2cb451c8cb5502 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 25 Feb 2024 20:50:21 +0100 Subject: [PATCH] feat: Made polling-interval for Kafka configurable --- .../de/juplo/kafka/chat/backend/ChatBackendProperties.java | 2 ++ .../chat/backend/implementation/kafka/DataChannel.java | 5 ++++- .../chat/backend/implementation/kafka/InfoChannel.java | 6 +++++- .../implementation/kafka/KafkaServicesConfiguration.java | 2 ++ 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index df4d1cd7..cfc296b1 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -6,6 +6,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import java.net.URI; import java.nio.file.Paths; +import java.time.Duration; import java.util.logging.Level; @@ -45,6 +46,7 @@ public class ChatBackendProperties private String infoChannelTopic = "info_channel"; private String dataChannelTopic = "data_channel"; private int numPartitions = 2; + private Duration pollingInterval = Duration.ofSeconds(1); private String haproxyRuntimeApi = "haproxy:8401"; private String haproxyMap = "/usr/local/etc/haproxy/sharding.map"; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 397b35af..5063da29 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -28,6 +28,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private final Consumer consumer; private final ZoneId zoneId; private final int numShards; + private final Duration pollingInterval; private final int bufferSize; private final Clock clock; private final boolean[] isShardOwned; @@ -49,6 +50,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener Consumer dataChannelConsumer, ZoneId zoneId, int numShards, + Duration pollingInterval, int bufferSize, Clock clock, InfoChannel infoChannel, @@ -65,6 +67,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener this.producer = producer; this.zoneId = zoneId; this.numShards = numShards; + this.pollingInterval = pollingInterval; this.bufferSize = bufferSize; this.clock = clock; this.isShardOwned = new boolean[numShards]; @@ -190,7 +193,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + ConsumerRecords records = consumer.poll(pollingInterval); log.info("Fetched {} messages", records.count()); if (loadInProgress) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index f5d5253c..a6351d0f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -32,6 +32,7 @@ public class InfoChannel implements Runnable private final String topic; private final Producer producer; private final Consumer consumer; + private final Duration pollingInterval; private final int numShards; private final String[] shardOwners; private final long[] currentOffset; @@ -48,6 +49,7 @@ public class InfoChannel implements Runnable String topic, Producer producer, Consumer infoChannelConsumer, + Duration pollingInterval, int numShards, URI instanceUri) { @@ -59,6 +61,8 @@ public class InfoChannel implements Runnable this.producer = producer; this.chatRoomInfo = new HashMap<>(); + this.pollingInterval = pollingInterval; + this.numShards = numShards; this.shardOwners = new String[numShards]; this.currentOffset = new long[numShards]; @@ -191,7 +195,7 @@ public class InfoChannel implements Runnable { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + ConsumerRecords records = consumer.poll(pollingInterval); log.debug("Fetched {} messages", records.count()); for (ConsumerRecord record : records) { diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 5a41ebcc..6b7c1566 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -130,6 +130,7 @@ public class KafkaServicesConfiguration properties.getKafka().getInfoChannelTopic(), producer, infoChannelConsumer, + properties.getKafka().getPollingInterval(), properties.getKafka().getNumPartitions(), properties.getKafka().getInstanceUri()); } @@ -151,6 +152,7 @@ public class KafkaServicesConfiguration dataChannelConsumer, zoneId, properties.getKafka().getNumPartitions(), + properties.getKafka().getPollingInterval(), properties.getChatroomBufferSize(), clock, infoChannel, -- 2.20.1