Version des `spring-consumer` mit einem Dead-Letter-Topic
[demos/kafka/training] / src / main / java / de / juplo / kafka / Application.java
index 273cee5..f26d7a2 100644 (file)
@@ -2,13 +2,36 @@ package de.juplo.kafka;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaOperations;
+import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.util.backoff.FixedBackOff;
 
 
 @SpringBootApplication
-@EnableConfigurationProperties(ApplicationProperties.class)
+@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
 public class Application
 {
+  @Bean
+  public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
+    KafkaOperations<?, ?> kafkaTemplate)
+  {
+    return new DeadLetterPublishingRecoverer(kafkaTemplate);
+  }
+
+  @Bean
+  public DefaultErrorHandler errorHandler(
+    DeadLetterPublishingRecoverer recoverer)
+  {
+    return new DefaultErrorHandler(
+      recoverer,
+      new FixedBackOff(0l, 0l));
+  }
+
+
   public static void main(String[] args)
   {
     SpringApplication.run(Application.class, args);