From b07b88b6a4128ff86c2fea01098557cad2b3ec04 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 22 Dec 2024 17:27:52 +0100 Subject: [PATCH] =?utf8?q?Timout=20f=C3=BCr=20den=20Poll-Request=20konfigu?= =?utf8?q?rierbar=20gemacht?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- src/main/java/de/juplo/kafka/ApplicationConfiguration.java | 1 + src/main/java/de/juplo/kafka/ApplicationProperties.java | 2 ++ src/main/java/de/juplo/kafka/ExampleConsumer.java | 5 ++++- src/main/resources/application.yml | 1 + src/test/java/de/juplo/kafka/ExampleConsumerTest.java | 3 +++ 5 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 89129d2b..a2828355 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -38,6 +38,7 @@ public class ApplicationConfiguration kafkaConsumer, recordHandler, clock, + properties.getConsumerProperties().getPollRequestTimeout(), properties.getConsumerProperties().getMaxPollInterval(), properties.getConsumerProperties().getMaxTimePerRecord(), backOffStrategy, diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 5067d14e..b2ef86da 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -48,6 +48,8 @@ public class ApplicationProperties @NotNull private Duration autoCommitInterval; @NotNull + private Duration pollRequestTimeout; + @NotNull private Duration maxPollInterval; @NotNull private int maxPollRecords; diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 622b163a..7f9b2c6e 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -28,6 +28,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable private final RecordHandler recordHandler; private final Thread workerThread; private final Clock clock; + private final Duration pollRequestTimeout; private final Duration maxPollInterval; private final Duration minTimeForNextRecord; private final BackOff backOffStrategy; @@ -44,6 +45,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable Consumer consumer, RecordHandler recordHandler, Clock clock, + Duration pollRequestTimeout, Duration maxPollInterval, Duration maxTimePerRecord, BackOff backOffStrategy, @@ -54,6 +56,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable this.consumer = consumer; this.recordHandler = recordHandler; this.clock = clock; + this.pollRequestTimeout = pollRequestTimeout; this.maxPollInterval = maxPollInterval; this.minTimeForNextRecord = maxTimePerRecord.multipliedBy(2); this.backOffStrategy = backOffStrategy; @@ -83,7 +86,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable try { ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); + consumer.poll(pollRequestTimeout); log.info("{} - Received {} messages", id, records.count()); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 21b09965..c59ed1db 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -6,6 +6,7 @@ juplo: topic: test auto-offset-reset: earliest auto-commit-interval: 5s + poll-request-timeout: 1s max-poll-interval: 5m max-poll-records: 500 max-time-per-record: 30s diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java index c2534f20..76373f25 100644 --- a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java +++ b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java @@ -45,6 +45,7 @@ import static de.juplo.kafka.ExampleConsumerTest.*; }, properties = { "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms", "juplo.consumer.max-poll-interval=" + MAX_POLL_INTERVALL_MS + "ms", "juplo.consumer.max-time-per-record=" + ERROR_TIMEOUT_MS + "ms", "juplo.consumer.num-retries=" + NUM_RETRIES, @@ -337,6 +338,7 @@ public class ExampleConsumerTest static final String TOPIC = "ExampleConsumerTest_TEST"; static final int NUM_PARTITIONS = 10; static final int NUM_RETRIES = 6; + static final int POLL_REQUEST_TIMEOUT_MS = 1000; static final int MAX_POLL_INTERVALL_MS = 5000; static final int ERROR_TIMEOUT_MS = 1000; @@ -368,6 +370,7 @@ public class ExampleConsumerTest consumer, mockRecordHandler, clock, + properties.getConsumerProperties().getPollRequestTimeout(), properties.getConsumerProperties().getMaxPollInterval(), properties.getConsumerProperties().getMaxTimePerRecord(), new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries()), -- 2.20.1