Timout für den Poll-Request konfigurierbar gemacht
authorKai Moritz <kai@juplo.de>
Sun, 22 Dec 2024 16:27:52 +0000 (17:27 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 5 Jan 2025 09:49:08 +0000 (10:49 +0100)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ExampleConsumerTest.java

index 89129d2..a282835 100644 (file)
@@ -38,6 +38,7 @@ public class ApplicationConfiguration
             kafkaConsumer,
             recordHandler,
             clock,
+            properties.getConsumerProperties().getPollRequestTimeout(),
             properties.getConsumerProperties().getMaxPollInterval(),
             properties.getConsumerProperties().getMaxTimePerRecord(),
             backOffStrategy,
index 5067d14..b2ef86d 100644 (file)
@@ -48,6 +48,8 @@ public class ApplicationProperties
     @NotNull
     private Duration autoCommitInterval;
     @NotNull
+    private Duration pollRequestTimeout;
+    @NotNull
     private Duration maxPollInterval;
     @NotNull
     private int maxPollRecords;
index 38b848e..7021fa0 100644 (file)
@@ -28,6 +28,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
   private final RecordHandler<String, Long> recordHandler;
   private final Thread workerThread;
   private final Clock clock;
+  private final Duration pollRequestTimeout;
   private final Duration maxPollInterval;
   private final Duration minTimeForNextRecord;
   private final BackOff backOffStrategy;
@@ -44,6 +45,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
     Consumer<String, Long> consumer,
     RecordHandler<String, Long> recordHandler,
     Clock clock,
+    Duration pollRequestTimeout,
     Duration maxPollInterval,
     Duration maxTimePerRecord,
     BackOff backOffStrategy,
@@ -54,6 +56,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
     this.consumer = consumer;
     this.recordHandler = recordHandler;
     this.clock = clock;
+    this.pollRequestTimeout = pollRequestTimeout;
     this.maxPollInterval = maxPollInterval;
     this.minTimeForNextRecord = maxTimePerRecord.multipliedBy(2);
     this.backOffStrategy = backOffStrategy;
@@ -83,7 +86,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
         try
         {
           ConsumerRecords<String, Long> records =
-            consumer.poll(Duration.ofSeconds(1));
+            consumer.poll(pollRequestTimeout);
 
           log.info("{} - Received {} messages", id, records.count());
 
index 21b0996..c59ed1d 100644 (file)
@@ -6,6 +6,7 @@ juplo:
     topic: test
     auto-offset-reset: earliest
     auto-commit-interval: 5s
+    poll-request-timeout: 1s
     max-poll-interval: 5m
     max-poll-records: 500
     max-time-per-record: 30s
index c2534f2..76373f2 100644 (file)
@@ -45,6 +45,7 @@ import static de.juplo.kafka.ExampleConsumerTest.*;
   },
   properties = {
     "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
+    "juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms",
     "juplo.consumer.max-poll-interval=" + MAX_POLL_INTERVALL_MS + "ms",
     "juplo.consumer.max-time-per-record=" + ERROR_TIMEOUT_MS + "ms",
     "juplo.consumer.num-retries=" + NUM_RETRIES,
@@ -337,6 +338,7 @@ public class ExampleConsumerTest
   static final String TOPIC = "ExampleConsumerTest_TEST";
   static final int NUM_PARTITIONS = 10;
   static final int NUM_RETRIES = 6;
+  static final int POLL_REQUEST_TIMEOUT_MS = 1000;
   static final int MAX_POLL_INTERVALL_MS = 5000;
   static final int ERROR_TIMEOUT_MS = 1000;
 
@@ -368,6 +370,7 @@ public class ExampleConsumerTest
       consumer,
       mockRecordHandler,
       clock,
+      properties.getConsumerProperties().getPollRequestTimeout(),
       properties.getConsumerProperties().getMaxPollInterval(),
       properties.getConsumerProperties().getMaxTimePerRecord(),
       new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries()),