Exceptions & Konfig für wiederholbare und nicht wiederholbare Fehler
authorKai Moritz <kai@juplo.de>
Thu, 19 Dec 2024 18:27:09 +0000 (19:27 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 24 Dec 2024 16:07:10 +0000 (17:07 +0100)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/main/java/de/juplo/kafka/NonRetriableErrorException.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/RecordHandler.java
src/main/java/de/juplo/kafka/RetriableErrorException.java [new file with mode: 0644]
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ExampleConsumerTest.java

index 107c342..89129d2 100644 (file)
@@ -10,7 +10,10 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.FixedBackOff;
 
+import java.time.Clock;
 import java.util.Properties;
 
 
@@ -24,6 +27,8 @@ public class ApplicationConfiguration
       Consumer<String, Long> kafkaConsumer,
       RecordHandler<String, Long> recordHandler,
       ApplicationProperties properties,
+      Clock clock,
+      BackOff backOffStrategy,
       ConfigurableApplicationContext applicationContext)
   {
     return
@@ -32,15 +37,31 @@ public class ApplicationConfiguration
             properties.getConsumerProperties().getTopic(),
             kafkaConsumer,
             recordHandler,
+            clock,
+            properties.getConsumerProperties().getMaxPollInterval(),
+            properties.getConsumerProperties().getMaxTimePerRecord(),
+            backOffStrategy,
             () -> applicationContext.close());
   }
 
   @Bean
-  public RecordHandler<String, Long> recordHandler(ApplicationProperties properties)
+  public RecordHandler<String, Long> recordHandler()
   {
     return (topic, partition, offset, key, value) -> log.info("No-Ops Handler called for {}={}", key, value);
   }
 
+  @Bean
+  public BackOff backOffStrategy(ApplicationProperties properties)
+  {
+    return new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries());
+  }
+
+  @Bean
+  public Clock clock()
+  {
+    return Clock.systemDefaultZone();
+  }
+
   @Bean(destroyMethod = "")
   public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
   {
@@ -52,11 +73,10 @@ public class ApplicationConfiguration
     {
       props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name());
     }
-    if (properties.getConsumerProperties().getAutoCommitInterval() != null)
-    {
-      props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval());
-    }
+    props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval());
     props.put("metadata.maxage.ms", 5000); //  5 Sekunden
+    props.put("max.poll.interval.ms", (int) properties.getConsumer().getMaxPollInterval().toMillis());
+    props.put("max.poll.interval.records", properties.getConsumer().getMaxPollRecords());
     props.put("partition.assignment.strategy", StickyAssignor.class.getName());
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", LongDeserializer.class.getName());
index c8193c9..5067d14 100644 (file)
@@ -45,7 +45,16 @@ public class ApplicationProperties
     @NotEmpty
     private String topic;
     private OffsetReset autoOffsetReset;
+    @NotNull
     private Duration autoCommitInterval;
+    @NotNull
+    private Duration maxPollInterval;
+    @NotNull
+    private int maxPollRecords;
+    @NotNull
+    private Duration maxTimePerRecord;
+    @NotNull
+    private int numRetries;
 
     enum OffsetReset { latest, earliest, none }
   }
index 149395a..16a81a1 100644 (file)
@@ -6,7 +6,9 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.util.backoff.BackOff;
 
+import java.time.Clock;
 import java.time.Duration;
 import java.util.Arrays;
 
@@ -30,6 +32,10 @@ public class ExampleConsumer implements Runnable
     String topic,
     Consumer<String, Long> consumer,
     RecordHandler<String, Long> recordHandler,
+    Clock clock,
+    Duration maxPollInterval,
+    Duration maxTimePerRecord,
+    BackOff backOffStrategy,
     Runnable closeCallback)
   {
     this.id = clientId;
@@ -102,7 +108,7 @@ public class ExampleConsumer implements Runnable
     Integer partition,
     Long offset,
     String key,
-    Long value)
+    Long value) throws RetriableErrorException, NonRetriableErrorException
   {
     consumed++;
     log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
diff --git a/src/main/java/de/juplo/kafka/NonRetriableErrorException.java b/src/main/java/de/juplo/kafka/NonRetriableErrorException.java
new file mode 100644 (file)
index 0000000..0eb0ff2
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka;
+
+public class NonRetriableErrorException extends Exception
+{
+  public NonRetriableErrorException(String message)
+  {
+    super(message);
+  }
+}
index a7b65af..5edcf58 100644 (file)
@@ -7,5 +7,5 @@ public interface RecordHandler<K, V>
     Integer partition,
     Long offset,
     K key,
-    V value);
+    V value) throws RetriableErrorException, NonRetriableErrorException;
 }
diff --git a/src/main/java/de/juplo/kafka/RetriableErrorException.java b/src/main/java/de/juplo/kafka/RetriableErrorException.java
new file mode 100644 (file)
index 0000000..598ddb0
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka;
+
+public class RetriableErrorException extends Exception
+{
+  public RetriableErrorException(String message)
+  {
+    super(message);
+  }
+}
index 7a06731..21b0996 100644 (file)
@@ -6,6 +6,10 @@ juplo:
     topic: test
     auto-offset-reset: earliest
     auto-commit-interval: 5s
+    max-poll-interval: 5m
+    max-poll-records: 500
+    max-time-per-record: 30s
+    num-retries: 10
 management:
   endpoint:
     shutdown:
index 590c9cd..b480fa3 100644 (file)
@@ -20,7 +20,9 @@ import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.util.backoff.FixedBackOff;
 
+import java.time.Clock;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
@@ -161,7 +163,9 @@ public class ExampleConsumerTest
 
 
   @BeforeEach
-  void createExampleConsumer(@Autowired ApplicationProperties properties)
+  void createExampleConsumer(
+    @Autowired ApplicationProperties properties,
+    @Autowired Clock clock)
   {
     ApplicationConfiguration configuration = new ApplicationConfiguration();
     Consumer<String, Long> consumer = configuration.kafkaConsumer(properties);
@@ -171,6 +175,10 @@ public class ExampleConsumerTest
       TOPIC,
       consumer,
       mockRecordHandler,
+      clock,
+      properties.getConsumerProperties().getMaxPollInterval(),
+      properties.getConsumerProperties().getMaxTimePerRecord(),
+      new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries()),
       () -> isTerminatedExceptionally.set(true));
   }
 
@@ -249,5 +257,11 @@ public class ExampleConsumerTest
       properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
       return AdminClient.create(properties);
     }
+
+    @Bean
+    Clock clock()
+    {
+      return Clock.systemDefaultZone();
+    }
   }
 }