From fe53c7a36b46042586f99f931c65079ad73e3ea8 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 19 Dec 2024 19:27:09 +0100 Subject: [PATCH] =?utf8?q?Exceptions=20&=20Konfig=20f=C3=BCr=20wiederholba?= =?utf8?q?re=20und=20nicht=20wiederholbare=20Fehler?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../juplo/kafka/ApplicationConfiguration.java | 30 +++++++++++++++---- .../de/juplo/kafka/ApplicationProperties.java | 9 ++++++ .../java/de/juplo/kafka/ExampleConsumer.java | 8 ++++- .../kafka/NonRetriableErrorException.java | 9 ++++++ .../java/de/juplo/kafka/RecordHandler.java | 2 +- .../juplo/kafka/RetriableErrorException.java | 9 ++++++ src/main/resources/application.yml | 4 +++ .../de/juplo/kafka/ExampleConsumerTest.java | 16 +++++++++- 8 files changed, 79 insertions(+), 8 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/NonRetriableErrorException.java create mode 100644 src/main/java/de/juplo/kafka/RetriableErrorException.java diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 107c3424..89129d2b 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -10,7 +10,10 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.FixedBackOff; +import java.time.Clock; import java.util.Properties; @@ -24,6 +27,8 @@ public class ApplicationConfiguration Consumer kafkaConsumer, RecordHandler recordHandler, ApplicationProperties properties, + Clock clock, + BackOff backOffStrategy, ConfigurableApplicationContext applicationContext) { return @@ -32,15 +37,31 @@ public class ApplicationConfiguration properties.getConsumerProperties().getTopic(), kafkaConsumer, recordHandler, + clock, + properties.getConsumerProperties().getMaxPollInterval(), + properties.getConsumerProperties().getMaxTimePerRecord(), + backOffStrategy, () -> applicationContext.close()); } @Bean - public RecordHandler recordHandler(ApplicationProperties properties) + public RecordHandler recordHandler() { return (topic, partition, offset, key, value) -> log.info("No-Ops Handler called for {}={}", key, value); } + @Bean + public BackOff backOffStrategy(ApplicationProperties properties) + { + return new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries()); + } + + @Bean + public Clock clock() + { + return Clock.systemDefaultZone(); + } + @Bean(destroyMethod = "") public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { @@ -52,11 +73,10 @@ public class ApplicationConfiguration { props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name()); } - if (properties.getConsumerProperties().getAutoCommitInterval() != null) - { - props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval()); - } + props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval()); props.put("metadata.maxage.ms", 5000); // 5 Sekunden + props.put("max.poll.interval.ms", (int) properties.getConsumer().getMaxPollInterval().toMillis()); + props.put("max.poll.interval.records", properties.getConsumer().getMaxPollRecords()); props.put("partition.assignment.strategy", StickyAssignor.class.getName()); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", LongDeserializer.class.getName()); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index c8193c9f..5067d14e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -45,7 +45,16 @@ public class ApplicationProperties @NotEmpty private String topic; private OffsetReset autoOffsetReset; + @NotNull private Duration autoCommitInterval; + @NotNull + private Duration maxPollInterval; + @NotNull + private int maxPollRecords; + @NotNull + private Duration maxTimePerRecord; + @NotNull + private int numRetries; enum OffsetReset { latest, earliest, none } } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index dac11ba3..ca0bc2df 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -6,7 +6,9 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; +import org.springframework.util.backoff.BackOff; +import java.time.Clock; import java.time.Duration; import java.util.Arrays; @@ -30,6 +32,10 @@ public class ExampleConsumer implements Runnable String topic, Consumer consumer, RecordHandler recordHandler, + Clock clock, + Duration maxPollInterval, + Duration maxTimePerRecord, + BackOff backOffStrategy, Runnable closeCallback) { this.id = clientId; @@ -102,7 +108,7 @@ public class ExampleConsumer implements Runnable Integer partition, Long offset, String key, - Long value) + Long value) throws RetriableErrorException, NonRetriableErrorException { consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); diff --git a/src/main/java/de/juplo/kafka/NonRetriableErrorException.java b/src/main/java/de/juplo/kafka/NonRetriableErrorException.java new file mode 100644 index 00000000..0eb0ff28 --- /dev/null +++ b/src/main/java/de/juplo/kafka/NonRetriableErrorException.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + +public class NonRetriableErrorException extends Exception +{ + public NonRetriableErrorException(String message) + { + super(message); + } +} diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java index a7b65af2..5edcf587 100644 --- a/src/main/java/de/juplo/kafka/RecordHandler.java +++ b/src/main/java/de/juplo/kafka/RecordHandler.java @@ -7,5 +7,5 @@ public interface RecordHandler Integer partition, Long offset, K key, - V value); + V value) throws RetriableErrorException, NonRetriableErrorException; } diff --git a/src/main/java/de/juplo/kafka/RetriableErrorException.java b/src/main/java/de/juplo/kafka/RetriableErrorException.java new file mode 100644 index 00000000..598ddb09 --- /dev/null +++ b/src/main/java/de/juplo/kafka/RetriableErrorException.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + +public class RetriableErrorException extends Exception +{ + public RetriableErrorException(String message) + { + super(message); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7a06731c..21b09965 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -6,6 +6,10 @@ juplo: topic: test auto-offset-reset: earliest auto-commit-interval: 5s + max-poll-interval: 5m + max-poll-records: 500 + max-time-per-record: 30s + num-retries: 10 management: endpoint: shutdown: diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java index 590c9cdf..b480fa38 100644 --- a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java +++ b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java @@ -20,7 +20,9 @@ import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.util.backoff.FixedBackOff; +import java.time.Clock; import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -161,7 +163,9 @@ public class ExampleConsumerTest @BeforeEach - void createExampleConsumer(@Autowired ApplicationProperties properties) + void createExampleConsumer( + @Autowired ApplicationProperties properties, + @Autowired Clock clock) { ApplicationConfiguration configuration = new ApplicationConfiguration(); Consumer consumer = configuration.kafkaConsumer(properties); @@ -171,6 +175,10 @@ public class ExampleConsumerTest TOPIC, consumer, mockRecordHandler, + clock, + properties.getConsumerProperties().getMaxPollInterval(), + properties.getConsumerProperties().getMaxTimePerRecord(), + new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries()), () -> isTerminatedExceptionally.set(true)); } @@ -249,5 +257,11 @@ public class ExampleConsumerTest properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker); return AdminClient.create(properties); } + + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } } } -- 2.20.1