From: Kai Moritz <kai@juplo.de>
Date: Sun, 22 Dec 2024 16:27:52 +0000 (+0100)
Subject: Timout für den Poll-Request konfigurierbar gemacht
X-Git-Tag: consumer/spring-consumer--error-handling--COMMITS--2025-02~30
X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=781967d1703858515e110a2e3b5518c7e2fb49b7;p=demos%2Fkafka%2Ftraining

Timout für den Poll-Request konfigurierbar gemacht
---

diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
index 89129d2b..a2828355 100644
--- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
+++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
@@ -38,6 +38,7 @@ public class ApplicationConfiguration
             kafkaConsumer,
             recordHandler,
             clock,
+            properties.getConsumerProperties().getPollRequestTimeout(),
             properties.getConsumerProperties().getMaxPollInterval(),
             properties.getConsumerProperties().getMaxTimePerRecord(),
             backOffStrategy,
diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java
index 5067d14e..b2ef86da 100644
--- a/src/main/java/de/juplo/kafka/ApplicationProperties.java
+++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java
@@ -48,6 +48,8 @@ public class ApplicationProperties
     @NotNull
     private Duration autoCommitInterval;
     @NotNull
+    private Duration pollRequestTimeout;
+    @NotNull
     private Duration maxPollInterval;
     @NotNull
     private int maxPollRecords;
diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java
index 622b163a..7f9b2c6e 100644
--- a/src/main/java/de/juplo/kafka/ExampleConsumer.java
+++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java
@@ -28,6 +28,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
   private final RecordHandler<String, Long> recordHandler;
   private final Thread workerThread;
   private final Clock clock;
+  private final Duration pollRequestTimeout;
   private final Duration maxPollInterval;
   private final Duration minTimeForNextRecord;
   private final BackOff backOffStrategy;
@@ -44,6 +45,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
     Consumer<String, Long> consumer,
     RecordHandler<String, Long> recordHandler,
     Clock clock,
+    Duration pollRequestTimeout,
     Duration maxPollInterval,
     Duration maxTimePerRecord,
     BackOff backOffStrategy,
@@ -54,6 +56,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
     this.consumer = consumer;
     this.recordHandler = recordHandler;
     this.clock = clock;
+    this.pollRequestTimeout = pollRequestTimeout;
     this.maxPollInterval = maxPollInterval;
     this.minTimeForNextRecord = maxTimePerRecord.multipliedBy(2);
     this.backOffStrategy = backOffStrategy;
@@ -83,7 +86,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
         try
         {
           ConsumerRecords<String, Long> records =
-            consumer.poll(Duration.ofSeconds(1));
+            consumer.poll(pollRequestTimeout);
 
           log.info("{} - Received {} messages", id, records.count());
 
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 21b09965..c59ed1db 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -6,6 +6,7 @@ juplo:
     topic: test
     auto-offset-reset: earliest
     auto-commit-interval: 5s
+    poll-request-timeout: 1s
     max-poll-interval: 5m
     max-poll-records: 500
     max-time-per-record: 30s
diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java
index c2534f20..76373f25 100644
--- a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java
+++ b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java
@@ -45,6 +45,7 @@ import static de.juplo.kafka.ExampleConsumerTest.*;
   },
   properties = {
     "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
+    "juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms",
     "juplo.consumer.max-poll-interval=" + MAX_POLL_INTERVALL_MS + "ms",
     "juplo.consumer.max-time-per-record=" + ERROR_TIMEOUT_MS + "ms",
     "juplo.consumer.num-retries=" + NUM_RETRIES,
@@ -337,6 +338,7 @@ public class ExampleConsumerTest
   static final String TOPIC = "ExampleConsumerTest_TEST";
   static final int NUM_PARTITIONS = 10;
   static final int NUM_RETRIES = 6;
+  static final int POLL_REQUEST_TIMEOUT_MS = 1000;
   static final int MAX_POLL_INTERVALL_MS = 5000;
   static final int ERROR_TIMEOUT_MS = 1000;
 
@@ -368,6 +370,7 @@ public class ExampleConsumerTest
       consumer,
       mockRecordHandler,
       clock,
+      properties.getConsumerProperties().getPollRequestTimeout(),
       properties.getConsumerProperties().getMaxPollInterval(),
       properties.getConsumerProperties().getMaxTimePerRecord(),
       new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries()),