From: Kai Moritz Date: Sun, 22 Dec 2024 16:27:52 +0000 (+0100) Subject: Timout für den Poll-Request konfigurierbar gemacht X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=e6f088c26baecdef1d890997b3846415a4437010;p=demos%2Fkafka%2Ftraining Timout für den Poll-Request konfigurierbar gemacht --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 89129d2b..a2828355 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -38,6 +38,7 @@ public class ApplicationConfiguration kafkaConsumer, recordHandler, clock, + properties.getConsumerProperties().getPollRequestTimeout(), properties.getConsumerProperties().getMaxPollInterval(), properties.getConsumerProperties().getMaxTimePerRecord(), backOffStrategy, diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 5067d14e..b2ef86da 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -48,6 +48,8 @@ public class ApplicationProperties @NotNull private Duration autoCommitInterval; @NotNull + private Duration pollRequestTimeout; + @NotNull private Duration maxPollInterval; @NotNull private int maxPollRecords; diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 38b848e1..7021fa04 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -28,6 +28,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable private final RecordHandler recordHandler; private final Thread workerThread; private final Clock clock; + private final Duration pollRequestTimeout; private final Duration maxPollInterval; private final Duration minTimeForNextRecord; private final BackOff backOffStrategy; @@ -44,6 +45,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable Consumer consumer, RecordHandler recordHandler, Clock clock, + Duration pollRequestTimeout, Duration maxPollInterval, Duration maxTimePerRecord, BackOff backOffStrategy, @@ -54,6 +56,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable this.consumer = consumer; this.recordHandler = recordHandler; this.clock = clock; + this.pollRequestTimeout = pollRequestTimeout; this.maxPollInterval = maxPollInterval; this.minTimeForNextRecord = maxTimePerRecord.multipliedBy(2); this.backOffStrategy = backOffStrategy; @@ -83,7 +86,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable try { ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); + consumer.poll(pollRequestTimeout); log.info("{} - Received {} messages", id, records.count()); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 21b09965..c59ed1db 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -6,6 +6,7 @@ juplo: topic: test auto-offset-reset: earliest auto-commit-interval: 5s + poll-request-timeout: 1s max-poll-interval: 5m max-poll-records: 500 max-time-per-record: 30s diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java index c2534f20..76373f25 100644 --- a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java +++ b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java @@ -45,6 +45,7 @@ import static de.juplo.kafka.ExampleConsumerTest.*; }, properties = { "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms", "juplo.consumer.max-poll-interval=" + MAX_POLL_INTERVALL_MS + "ms", "juplo.consumer.max-time-per-record=" + ERROR_TIMEOUT_MS + "ms", "juplo.consumer.num-retries=" + NUM_RETRIES, @@ -337,6 +338,7 @@ public class ExampleConsumerTest static final String TOPIC = "ExampleConsumerTest_TEST"; static final int NUM_PARTITIONS = 10; static final int NUM_RETRIES = 6; + static final int POLL_REQUEST_TIMEOUT_MS = 1000; static final int MAX_POLL_INTERVALL_MS = 5000; static final int ERROR_TIMEOUT_MS = 1000; @@ -368,6 +370,7 @@ public class ExampleConsumerTest consumer, mockRecordHandler, clock, + properties.getConsumerProperties().getPollRequestTimeout(), properties.getConsumerProperties().getMaxPollInterval(), properties.getConsumerProperties().getMaxTimePerRecord(), new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries()),