Springify: Der Kafka-`Consumer` wird über die Spring-Factory erzeugt
authorKai Moritz <kai@juplo.de>
Fri, 22 Apr 2022 09:24:55 +0000 (11:24 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 22 Apr 2022 15:14:39 +0000 (17:14 +0200)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/test/java/de/juplo/kafka/ApplicationTests.java

index 3c526df..ce2d450 100644 (file)
@@ -1,15 +1,12 @@
 package de.juplo.kafka;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.ConsumerFactory;
 
-import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Consumer;
@@ -30,7 +27,7 @@ public class ApplicationConfiguration
 
   @Bean
   public EndlessConsumer<String, Long> endlessConsumer(
-      KafkaConsumer<String, Long> kafkaConsumer,
+      org.apache.kafka.clients.consumer.Consumer<String, Long> kafkaConsumer,
       ExecutorService executor,
       Consumer<ConsumerRecord<String, Long>> handler,
       KafkaProperties kafkaProperties,
@@ -52,18 +49,8 @@ public class ApplicationConfiguration
   }
 
   @Bean(destroyMethod = "close")
-  public KafkaConsumer<String, Long> kafkaConsumer(KafkaProperties properties)
+  public org.apache.kafka.clients.consumer.Consumer<String, Long> kafkaConsumer(ConsumerFactory<String, Long> factory)
   {
-    Properties props = new Properties();
-
-    props.put("bootstrap.servers", properties.getConsumer().getBootstrapServers());
-    props.put("group.id", properties.getConsumer().getGroupId());
-    props.put("client.id", properties.getConsumer().getClientId());
-    props.put("auto.offset.reset", properties.getConsumer().getAutoOffsetReset());
-    props.put("metadata.max.age.ms", "1000");
-    props.put("key.deserializer", StringDeserializer.class.getName());
-    props.put("value.deserializer", LongDeserializer.class.getName());
-
-    return new KafkaConsumer<>(props);
+    return factory.createConsumer();
   }
 }
index d446bbe..a185b72 100644 (file)
@@ -11,6 +11,7 @@ import org.apache.kafka.common.serialization.*;
 import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.*;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
 import org.springframework.boot.test.context.TestConfiguration;
@@ -36,7 +37,11 @@ import static org.assertj.core.api.Assertions.*;
 import static org.awaitility.Awaitility.*;
 
 
-@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
+@SpringJUnitConfig(
+               initializers = ConfigDataApplicationContextInitializer.class,
+               classes = {
+                               KafkaAutoConfiguration.class,
+                               ApplicationTests.Configuration.class })
 @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
 @TestPropertySource(
                properties = {
@@ -57,7 +62,7 @@ class ApplicationTests
        @Autowired
        KafkaProducer<String, Bytes> kafkaProducer;
        @Autowired
-       KafkaConsumer<String, Long> kafkaConsumer;
+       org.apache.kafka.clients.consumer.Consumer<String, Long> kafkaConsumer;
        @Autowired
        KafkaConsumer<Bytes, Bytes> offsetConsumer;
        @Autowired