Slack für Poll-Intervall ist explizit konfigurierbar
authorKai Moritz <kai@juplo.de>
Tue, 24 Dec 2024 15:45:49 +0000 (16:45 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 5 Jan 2025 11:26:36 +0000 (12:26 +0100)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ExampleConsumerTest.java

index a282835..619e44c 100644 (file)
@@ -41,6 +41,7 @@ public class ApplicationConfiguration
             properties.getConsumerProperties().getPollRequestTimeout(),
             properties.getConsumerProperties().getMaxPollInterval(),
             properties.getConsumerProperties().getMaxTimePerRecord(),
+            properties.getConsumerProperties().getMinSlackPerPollInterval(),
             backOffStrategy,
             () -> applicationContext.close());
   }
index b2ef86d..ed8f23a 100644 (file)
@@ -56,6 +56,8 @@ public class ApplicationProperties
     @NotNull
     private Duration maxTimePerRecord;
     @NotNull
+    private Duration minSlackPerPollInterval;
+    @NotNull
     private int numRetries;
 
     enum OffsetReset { latest, earliest, none }
index 504d1b5..21991f8 100644 (file)
@@ -48,6 +48,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
     Duration pollRequestTimeout,
     Duration maxPollInterval,
     Duration maxTimePerRecord,
+    Duration minSlackPerPollInterval,
     BackOff backOffStrategy,
     Runnable closeCallback)
   {
@@ -58,7 +59,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
     this.clock = clock;
     this.pollRequestTimeout = pollRequestTimeout;
     this.maxPollInterval = maxPollInterval;
-    this.minTimeForNextRecord = maxTimePerRecord.multipliedBy(2);
+    this.minTimeForNextRecord = maxTimePerRecord.plus(minSlackPerPollInterval);
     this.backOffStrategy = backOffStrategy;
 
     int numPartitions = consumer.partitionsFor(topic).size();
index c59ed1d..554d632 100644 (file)
@@ -10,6 +10,7 @@ juplo:
     max-poll-interval: 5m
     max-poll-records: 500
     max-time-per-record: 30s
+    min-slack-per-poll-interval: 1s
     num-retries: 10
 management:
   endpoint:
index 8cf1e22..0fe8fff 100644 (file)
@@ -49,6 +49,7 @@ import static de.juplo.kafka.ExampleConsumerTest.*;
     "juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms",
     "juplo.consumer.max-poll-interval=" + MAX_POLL_INTERVALL_MS + "ms",
     "juplo.consumer.max-time-per-record=" + MAX_TIME_PER_RECORD_MS + "ms",
+    "juplo.consumer.min-slack-per-poll-interval=" + MIN_SLACK_PER_POLL_INTERVAL_MS + "ms",
     "juplo.consumer.num-retries=" + NUM_RETRIES,
     "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
     "logging.level.de.juplo.kafka=TRACE",
@@ -429,6 +430,7 @@ public class ExampleConsumerTest
   static final int POLL_REQUEST_TIMEOUT_MS = 50;
   static final int MAX_POLL_INTERVALL_MS = 500;
   static final int MAX_TIME_PER_RECORD_MS = 100;
+  static final int MIN_SLACK_PER_POLL_INTERVAL_MS = 100;
 
   @Autowired
   KafkaTemplate<String, byte[]> kafkaTemplate;
@@ -465,6 +467,7 @@ public class ExampleConsumerTest
       properties.getConsumerProperties().getPollRequestTimeout(),
       properties.getConsumerProperties().getMaxPollInterval(),
       properties.getConsumerProperties().getMaxTimePerRecord(),
+      properties.getConsumerProperties().getMinSlackPerPollInterval(),
       backOff,
       () -> isTerminatedExceptionally.set(true));
   }