import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@TestPropertySource(
properties = {
- "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
"consumer.topic=" + TOPIC })
@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
@Slf4j
@Autowired
KafkaConsumer<Bytes, Bytes> offsetConsumer;
@Autowired
- ApplicationProperties properties;
+ ApplicationProperties applicationProperties;
+ @Autowired
+ KafkaProperties kafkaProperties;
@Autowired
ExecutorService executor;
endlessConsumer =
new EndlessConsumer<>(
executor,
- properties.getClientId(),
- properties.getTopic(),
+ kafkaProperties.getConsumer().getClientId(),
+ applicationProperties.getTopic(),
kafkaConsumer,
captureOffsetAndExecuteTestHandler);
}
@Bean
- KafkaProducer<String, Bytes> kafkaProducer(ApplicationProperties properties)
+ KafkaProducer<String, Bytes> kafkaProducer(KafkaProperties properties)
{
Properties props = new Properties();
- props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("bootstrap.servers", properties.getConsumer().getBootstrapServers());
props.put("linger.ms", 100);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", BytesSerializer.class.getName());
}
@Bean
- KafkaConsumer<Bytes, Bytes> offsetConsumer(ApplicationProperties properties)
+ KafkaConsumer<Bytes, Bytes> offsetConsumer(KafkaProperties properties)
{
Properties props = new Properties();
- props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("bootstrap.servers", properties.getConsumer().getBootstrapServers());
props.put("client.id", "OFFSET-CONSUMER");
- props.put("group.id", properties.getGroupId());
+ props.put("group.id", properties.getConsumer().getGroupId());
props.put("key.deserializer", BytesDeserializer.class.getName());
props.put("value.deserializer", BytesDeserializer.class.getName());