import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
+import org.springframework.util.Assert;
import java.util.Optional;
import java.util.Properties;
@Bean(destroyMethod = "close")
KafkaProducer<String, String> producer(TransferServiceProperties properties)
{
+ Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
+ Assert.hasText(properties.getTopic(), "juplo.transfer.topic must be set");
+
Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
@Bean
KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
{
+ Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
+ Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
+
Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
- props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.groupId);
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
{
return
new TransferConsumer(
- properties.topic,
+ properties.getTopic(),
consumer,
mapper,
new TransferConsumer.ConsumerUseCases() {
ObjectMapper mapper,
TransferServiceProperties properties)
{
- return new KafkaMessagingService(producer, mapper, properties.topic);
+ return new KafkaMessagingService(producer, mapper, properties.getTopic());
}
@Bean