producer:
image: juplo/spring-producer:1.0-SNAPSHOT
environment:
- juplo.producer.bootstrap-server: kafka:9092
- juplo.producer.client-id: producer
+ juplo.bootstrap-server: kafka:9092
+ juplo.client-id: producer
juplo.producer.topic: test
consumer-1:
return
new ExampleProducer(
properties.getClientId(),
- properties.getTopic(),
+ properties.getProducerProperties().getTopic(),
kafkaProducer);
}
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("client.id", properties.getClientId());
- props.put("acks", properties.getAcks());
- props.put("batch.size", properties.getBatchSize());
+ props.put("acks", properties.getProducerProperties().getAcks());
+ props.put("batch.size", properties.getProducerProperties().getBatchSize());
props.put("metadata.max.age.ms", 5000); // 5 Sekunden
props.put("delivery.timeout.ms", 20000); // 20 Sekunden
props.put("request.timeout.ms", 10000); // 10 Sekunden
- props.put("linger.ms", properties.getLinger().toMillis());
- props.put("compression.type", properties.getCompressionType());
+ props.put("linger.ms", properties.getProducerProperties().getLinger().toMillis());
+ props.put("compression.type", properties.getProducerProperties().getCompressionType());
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
import java.time.Duration;
-@ConfigurationProperties(prefix = "juplo.producer")
+@ConfigurationProperties(prefix = "juplo")
@Validated
@Getter
@Setter
@NotNull
@NotEmpty
private String clientId;
+
@NotNull
- @NotEmpty
- private String topic;
- @NotNull
- @NotEmpty
- private String acks;
- @NotNull
- private Integer batchSize;
- @NotNull
- private Duration linger;
- @NotNull
- @NotEmpty
- private String compressionType;
+ private ProducerProperties producer;
+
+
+ public ProducerProperties getProducerProperties()
+ {
+ return producer;
+ }
+
+
+ @Validated
+ @Getter
+ @Setter
+ static class ProducerProperties
+ {
+ @NotNull
+ @NotEmpty
+ private String topic;
+ @NotNull
+ @NotEmpty
+ private String acks;
+ @NotNull
+ private Integer batchSize;
+ @NotNull
+ private Duration linger;
+ @NotNull
+ @NotEmpty
+ private String compressionType;
+ }
}
juplo:
+ bootstrap-server: :9092
+ client-id: DEV
producer:
- bootstrap-server: :9092
- client-id: DEV
topic: test
acks: -1
batch-size: 16384
enabled: true
info:
kafka:
- bootstrap-server: ${juplo.producer.bootstrap-server}
- client-id: ${juplo.producer.client-id}
- topic: ${juplo.producer.topic}
- acks: ${juplo.producer.acks}
- batch-size: ${juplo.producer.batch-size}
- linger: ${juplo.producer.linger}
- compression-type: ${juplo.producer.compression-type}
+ bootstrap-server: ${juplo.bootstrap-server}
+ client-id: ${juplo.client-id}
+ producer:
+ topic: ${juplo.producer.topic}
+ acks: ${juplo.producer.acks}
+ batch-size: ${juplo.producer.batch-size}
+ linger: ${juplo.producer.linger}
+ compression-type: ${juplo.producer.compression-type}
logging:
level:
root: INFO
properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.auto-offset-reset=earliest",
- "juplo.producer.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
"juplo.producer.topic=" + TOPIC})
@AutoConfigureMockMvc
@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)