<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ </dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
-@EnableConfigurationProperties(ApplicationProperties.class)
+@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
public class ApplicationConfiguration
{
@Bean
KafkaConsumer<String, Long> kafkaConsumer,
ExecutorService executor,
Consumer<ConsumerRecord<String, Long>> handler,
- ApplicationProperties properties)
+ KafkaProperties kafkaProperties,
+ ApplicationProperties applicationProperties)
{
return
new EndlessConsumer<>(
executor,
- properties.getClientId(),
- properties.getTopic(),
+ kafkaProperties.getConsumer().getClientId(),
+ applicationProperties.getTopic(),
kafkaConsumer,
handler);
}
}
@Bean(destroyMethod = "close")
- public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
+ public KafkaConsumer<String, Long> kafkaConsumer(KafkaProperties properties)
{
Properties props = new Properties();
- props.put("bootstrap.servers", properties.getBootstrapServer());
- props.put("group.id", properties.getGroupId());
- props.put("client.id", properties.getClientId());
- props.put("auto.offset.reset", properties.getAutoOffsetReset());
+ props.put("bootstrap.servers", properties.getConsumer().getBootstrapServers());
+ props.put("group.id", properties.getConsumer().getGroupId());
+ props.put("client.id", properties.getConsumer().getClientId());
+ props.put("auto.offset.reset", properties.getConsumer().getAutoOffsetReset());
props.put("metadata.max.age.ms", "1000");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", LongDeserializer.class.getName());
@Setter
public class ApplicationProperties
{
- @NotNull
- @NotEmpty
- private String bootstrapServer;
- @NotNull
- @NotEmpty
- private String groupId;
- @NotNull
- @NotEmpty
- private String clientId;
@NotNull
@NotEmpty
private String topic;
- @NotNull
- @NotEmpty
- private String autoOffsetReset;
}
consumer:
- bootstrap-server: :9092
- group-id: my-group
- client-id: DEV
topic: test
- auto-offset-reset: earliest
management:
endpoint:
shutdown:
enabled: true
info:
kafka:
- bootstrap-server: ${consumer.bootstrap-server}
- client-id: ${consumer.client-id}
- group-id: ${consumer.group-id}
+ bootstrap-server: ${spring.kafka.consumer.bootstrap-servers}
+ client-id: ${spring.kafka.consumer.client-id}
+ group-id: ${spring.kafka.consumer.group-id}
topic: ${consumer.topic}
- auto-offset-reset: ${consumer.auto-offset-reset}
+ auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset}
+spring:
+ kafka:
+ consumer:
+ bootstrap-servers: :9092
+ client-id: DEV
+ auto-offset-reset: earliest
+ group-id: my-group
+ value-deserializer: org.apache.kafka.common.serialization.LongDeserializer
logging:
level:
root: INFO
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@TestPropertySource(
properties = {
- "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
"consumer.topic=" + TOPIC })
@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
@Slf4j
@Autowired
KafkaConsumer<Bytes, Bytes> offsetConsumer;
@Autowired
- ApplicationProperties properties;
+ ApplicationProperties applicationProperties;
+ @Autowired
+ KafkaProperties kafkaProperties;
@Autowired
ExecutorService executor;
endlessConsumer =
new EndlessConsumer<>(
executor,
- properties.getClientId(),
- properties.getTopic(),
+ kafkaProperties.getConsumer().getClientId(),
+ applicationProperties.getTopic(),
kafkaConsumer,
captureOffsetAndExecuteTestHandler);
}
@Bean
- KafkaProducer<String, Bytes> kafkaProducer(ApplicationProperties properties)
+ KafkaProducer<String, Bytes> kafkaProducer(KafkaProperties properties)
{
Properties props = new Properties();
- props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("bootstrap.servers", properties.getConsumer().getBootstrapServers());
props.put("linger.ms", 100);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", BytesSerializer.class.getName());
}
@Bean
- KafkaConsumer<Bytes, Bytes> offsetConsumer(ApplicationProperties properties)
+ KafkaConsumer<Bytes, Bytes> offsetConsumer(KafkaProperties properties)
{
Properties props = new Properties();
- props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("bootstrap.servers", properties.getConsumer().getBootstrapServers());
props.put("client.id", "OFFSET-CONSUMER");
- props.put("group.id", properties.getGroupId());
+ props.put("group.id", properties.getConsumer().getGroupId());
props.put("key.deserializer", BytesDeserializer.class.getName());
props.put("value.deserializer", BytesDeserializer.class.getName());