From 03cce870a948a542ca90a01a40b8500acb4299a7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 24 Dec 2024 16:45:49 +0100 Subject: [PATCH] =?utf8?q?Slack=20f=C3=BCr=20Poll-Intervall=20ist=20expliz?= =?utf8?q?it=20konfigurierbar?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- src/main/java/de/juplo/kafka/ApplicationConfiguration.java | 1 + src/main/java/de/juplo/kafka/ApplicationProperties.java | 2 ++ src/main/java/de/juplo/kafka/ExampleConsumer.java | 3 ++- src/main/resources/application.yml | 1 + src/test/java/de/juplo/kafka/ExampleConsumerTest.java | 3 +++ 5 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index a2828355..619e44c8 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -41,6 +41,7 @@ public class ApplicationConfiguration properties.getConsumerProperties().getPollRequestTimeout(), properties.getConsumerProperties().getMaxPollInterval(), properties.getConsumerProperties().getMaxTimePerRecord(), + properties.getConsumerProperties().getMinSlackPerPollInterval(), backOffStrategy, () -> applicationContext.close()); } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index b2ef86da..ed8f23a5 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -56,6 +56,8 @@ public class ApplicationProperties @NotNull private Duration maxTimePerRecord; @NotNull + private Duration minSlackPerPollInterval; + @NotNull private int numRetries; enum OffsetReset { latest, earliest, none } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 504d1b56..21991f87 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -48,6 +48,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable Duration pollRequestTimeout, Duration maxPollInterval, Duration maxTimePerRecord, + Duration minSlackPerPollInterval, BackOff backOffStrategy, Runnable closeCallback) { @@ -58,7 +59,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable this.clock = clock; this.pollRequestTimeout = pollRequestTimeout; this.maxPollInterval = maxPollInterval; - this.minTimeForNextRecord = maxTimePerRecord.multipliedBy(2); + this.minTimeForNextRecord = maxTimePerRecord.plus(minSlackPerPollInterval); this.backOffStrategy = backOffStrategy; int numPartitions = consumer.partitionsFor(topic).size(); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index c59ed1db..554d632b 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -10,6 +10,7 @@ juplo: max-poll-interval: 5m max-poll-records: 500 max-time-per-record: 30s + min-slack-per-poll-interval: 1s num-retries: 10 management: endpoint: diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java index 8cf1e22d..0fe8fff6 100644 --- a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java +++ b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java @@ -49,6 +49,7 @@ import static de.juplo.kafka.ExampleConsumerTest.*; "juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms", "juplo.consumer.max-poll-interval=" + MAX_POLL_INTERVALL_MS + "ms", "juplo.consumer.max-time-per-record=" + MAX_TIME_PER_RECORD_MS + "ms", + "juplo.consumer.min-slack-per-poll-interval=" + MIN_SLACK_PER_POLL_INTERVAL_MS + "ms", "juplo.consumer.num-retries=" + NUM_RETRIES, "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer", "logging.level.de.juplo.kafka=TRACE", @@ -429,6 +430,7 @@ public class ExampleConsumerTest static final int POLL_REQUEST_TIMEOUT_MS = 50; static final int MAX_POLL_INTERVALL_MS = 500; static final int MAX_TIME_PER_RECORD_MS = 100; + static final int MIN_SLACK_PER_POLL_INTERVAL_MS = 100; @Autowired KafkaTemplate kafkaTemplate; @@ -465,6 +467,7 @@ public class ExampleConsumerTest properties.getConsumerProperties().getPollRequestTimeout(), properties.getConsumerProperties().getMaxPollInterval(), properties.getConsumerProperties().getMaxTimePerRecord(), + properties.getConsumerProperties().getMinSlackPerPollInterval(), backOff, () -> isTerminatedExceptionally.set(true)); } -- 2.20.1