Springify: ROT - Auto Startup in @KafkaListener deaktiviert
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationConfiguration.java
index 5cefa32..4c74eb2 100644 (file)
@@ -4,6 +4,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
 
 import java.util.function.Consumer;
 
@@ -20,4 +24,22 @@ public class ApplicationConfiguration
       // Handle record
     };
   }
+
+  @Bean
+  public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory<String, Long> consumerFactory)
+  {
+    ConcurrentKafkaListenerContainerFactory<String, Long> factory =
+        new ConcurrentKafkaListenerContainerFactory<>();
+
+    factory.setConsumerFactory(consumerFactory);
+    factory.setBatchListener(true);
+
+    return factory;
+  }
+
+  @Bean
+  public CommonContainerStoppingErrorHandler errorHandler()
+  {
+    return new CommonContainerStoppingErrorHandler();
+  }
 }