consumer:
image: juplo/spring-consumer:1.1-SNAPSHOT
environment:
- juplo.consumer.bootstrap-server: kafka:9092
- juplo.consumer.client-id: consumer
+ juplo.bootstrap-server: kafka:9092
+ juplo.client-id: consumer
juplo.consumer.topic: test
volumes:
return
new ExampleConsumer(
properties.getClientId(),
- properties.getTopic(),
+ properties.getConsumerProperties().getTopic(),
kafkaConsumer);
}
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("client.id", properties.getClientId());
- props.put("group.id", properties.getGroupId());
- if (properties.getAutoOffsetReset() != null)
+ props.put("group.id", properties.getConsumerProperties().getGroupId());
+ if (properties.getConsumerProperties().getAutoOffsetReset() != null)
{
- props.put("auto.offset.reset", properties.getAutoOffsetReset().name());
+ props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name());
}
- if (properties.autoCommitInterval != null)
+ if (properties.getConsumerProperties().getAutoCommitInterval() != null)
{
- props.put("auto.commit.interval", properties.getAutoCommitInterval());
+ props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval());
}
props.put("metadata.maxage.ms", 5000); // 5 Sekunden
props.put("partition.assignment.strategy", StickyAssignor.class.getName());
import java.time.Duration;
-@ConfigurationProperties(prefix = "juplo.consumer")
+@ConfigurationProperties(prefix = "juplo")
@Validated
@Getter
@Setter
@NotNull
@NotEmpty
private String clientId;
+
@NotNull
- @NotEmpty
- private String groupId;
- @NotNull
- @NotEmpty
- private String topic;
- ApplicationProperties.OffsetReset autoOffsetReset;
- Duration autoCommitInterval;
+ private ConsumerProperties consumer;
+
+
+ public ConsumerProperties getConsumerProperties()
+ {
+ return consumer;
+ }
+
+
+ @Validated
+ @Getter
+ @Setter
+ static class ConsumerProperties
+ {
+ @NotNull
+ @NotEmpty
+ private String groupId;
+ @NotNull
+ @NotEmpty
+ private String topic;
+ private OffsetReset autoOffsetReset;
+ private Duration autoCommitInterval;
- enum OffsetReset { latest, earliest, none}
+ enum OffsetReset { latest, earliest, none }
+ }
}
juplo:
+ bootstrap-server: :9092
+ client-id: DEV
consumer:
- bootstrap-server: :9092
- client-id: DEV
group-id: my-group
topic: test
auto-offset-reset: earliest
enabled: true
info:
kafka:
- bootstrap-server: ${juplo.consumer.bootstrap-server}
- client-id: ${juplo.consumer.client-id}
- group-id: ${juplo.consumer.group-id}
- topic: ${juplo.consumer.topic}
- auto-offset-reset: ${juplo.consumer.auto-offset-reset}
- auto-commit-interval: ${juplo.consumer.auto-commit-interval}
+ bootstrap-server: ${juplo.bootstrap-server}
+ client-id: ${juplo.client-id}
+ consumer:
+ group-id: ${juplo.consumer.group-id}
+ topic: ${juplo.consumer.topic}
+ auto-offset-reset: ${juplo.consumer.auto-offset-reset}
+ auto-commit-interval: ${juplo.consumer.auto-commit-interval}
logging:
level:
root: INFO