Konfiguration über `ApplicationConiguration` - aber von Hand consumer/spring-consumer--record-handler--COMMITS--2025-02
authorKai Moritz <kai@juplo.de>
Thu, 19 Dec 2024 20:34:17 +0000 (21:34 +0100)
committerKai Moritz <kai@juplo.de>
Thu, 6 Feb 2025 17:03:39 +0000 (18:03 +0100)
src/test/java/de/juplo/kafka/ExampleConsumerTest.java

index 03269fd..590c9cd 100644 (file)
@@ -4,12 +4,9 @@ import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -25,10 +22,8 @@ import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 
 import java.time.Duration;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
@@ -41,9 +36,11 @@ import static de.juplo.kafka.ExampleConsumerTest.TOPIC;
 @SpringBootTest(
   classes = {
     KafkaAutoConfiguration.class,
+    ApplicationProperties.class,
     ExampleConsumerTest.ConsumerRunnableTestConfig.class,
   },
   properties = {
+    "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
     "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
     "logging.level.de.juplo.kafka=TRACE",
   })
@@ -164,17 +161,10 @@ public class ExampleConsumerTest
 
 
   @BeforeEach
-  void createExampleConsumer(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker)
+  void createExampleConsumer(@Autowired ApplicationProperties properties)
   {
-    Properties props = new Properties();
-    props.put("bootstrap.servers", kafkaBroker);
-    props.put("client.id", ID);
-    props.put("group.id", ID);
-    props.put("auto.offset.reset", "earliest");
-    props.put("key.deserializer", StringDeserializer.class.getName());
-    props.put("value.deserializer", LongDeserializer.class.getName());
-
-    Consumer<String, Long> consumer = new KafkaConsumer<>(props);
+    ApplicationConfiguration configuration = new ApplicationConfiguration();
+    Consumer<String, Long> consumer = configuration.kafkaConsumer(properties);
 
     exampleConsumer = new ExampleConsumer(
       ID,