From: Kai Moritz Date: Thu, 19 Dec 2024 20:34:17 +0000 (+0100) Subject: Konfiguration über `ApplicationConiguration` - aber von Hand X-Git-Tag: consumer/spring-consumer--record-handler--2025-01-signal X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=2cc7799850ad4308546fdb552be8b5ebb38c93d3;p=demos%2Fkafka%2Ftraining Konfiguration über `ApplicationConiguration` - aber von Hand --- diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java index 03269fd2..590c9cdf 100644 --- a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java +++ b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java @@ -4,12 +4,9 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -25,10 +22,8 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; import java.time.Duration; -import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -41,9 +36,11 @@ import static de.juplo.kafka.ExampleConsumerTest.TOPIC; @SpringBootTest( classes = { KafkaAutoConfiguration.class, + ApplicationProperties.class, ExampleConsumerTest.ConsumerRunnableTestConfig.class, }, properties = { + "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer", "logging.level.de.juplo.kafka=TRACE", }) @@ -164,17 +161,10 @@ public class ExampleConsumerTest @BeforeEach - void createExampleConsumer(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker) + void createExampleConsumer(@Autowired ApplicationProperties properties) { - Properties props = new Properties(); - props.put("bootstrap.servers", kafkaBroker); - props.put("client.id", ID); - props.put("group.id", ID); - props.put("auto.offset.reset", "earliest"); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", LongDeserializer.class.getName()); - - Consumer consumer = new KafkaConsumer<>(props); + ApplicationConfiguration configuration = new ApplicationConfiguration(); + Consumer consumer = configuration.kafkaConsumer(properties); exampleConsumer = new ExampleConsumer( ID,