From 3eae3e516821596bd3ce9047a4fee32a0d5f90cb Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 28 Oct 2024 08:46:58 +0100 Subject: [PATCH] Consumerspezifische Properties werden in eigener nested Class verwaltet MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Dadurch wird der Code übersichtlicher, wenn spätere Implementierungen _sowohl_ als Consumer, _als auch_ als Producer agieren! --- docker/docker-compose.yml | 4 +-- .../juplo/kafka/ApplicationConfiguration.java | 12 +++---- .../de/juplo/kafka/ApplicationProperties.java | 35 ++++++++++++++----- src/main/resources/application.yml | 17 ++++----- 4 files changed, 43 insertions(+), 25 deletions(-) diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 2366bc2..82c264d 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -196,8 +196,8 @@ services: consumer: image: juplo/spring-consumer:1.1-SNAPSHOT environment: - juplo.consumer.bootstrap-server: kafka:9092 - juplo.consumer.client-id: consumer + juplo.bootstrap-server: kafka:9092 + juplo.client-id: consumer juplo.consumer.topic: test volumes: diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index a3f3835..116d63d 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -23,7 +23,7 @@ public class ApplicationConfiguration return new ExampleConsumer( properties.getClientId(), - properties.getTopic(), + properties.getConsumerProperties().getTopic(), kafkaConsumer); } @@ -33,14 +33,14 @@ public class ApplicationConfiguration Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); props.put("client.id", properties.getClientId()); - props.put("group.id", properties.getGroupId()); - if (properties.getAutoOffsetReset() != null) + props.put("group.id", properties.getConsumerProperties().getGroupId()); + if (properties.getConsumerProperties().getAutoOffsetReset() != null) { - props.put("auto.offset.reset", properties.getAutoOffsetReset().name()); + props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name()); } - if (properties.autoCommitInterval != null) + if (properties.getConsumerProperties().getAutoCommitInterval() != null) { - props.put("auto.commit.interval", properties.getAutoCommitInterval()); + props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval()); } props.put("metadata.maxage.ms", 5000); // 5 Sekunden props.put("partition.assignment.strategy", StickyAssignor.class.getName()); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index f7134fb..c8193c9 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -10,7 +10,7 @@ import org.springframework.validation.annotation.Validated; import java.time.Duration; -@ConfigurationProperties(prefix = "juplo.consumer") +@ConfigurationProperties(prefix = "juplo") @Validated @Getter @Setter @@ -22,14 +22,31 @@ public class ApplicationProperties @NotNull @NotEmpty private String clientId; + @NotNull - @NotEmpty - private String groupId; - @NotNull - @NotEmpty - private String topic; - ApplicationProperties.OffsetReset autoOffsetReset; - Duration autoCommitInterval; + private ConsumerProperties consumer; + + + public ConsumerProperties getConsumerProperties() + { + return consumer; + } + + + @Validated + @Getter + @Setter + static class ConsumerProperties + { + @NotNull + @NotEmpty + private String groupId; + @NotNull + @NotEmpty + private String topic; + private OffsetReset autoOffsetReset; + private Duration autoCommitInterval; - enum OffsetReset { latest, earliest, none} + enum OffsetReset { latest, earliest, none } + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 5a89ee5..7a06731 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,7 +1,7 @@ juplo: + bootstrap-server: :9092 + client-id: DEV consumer: - bootstrap-server: :9092 - client-id: DEV group-id: my-group topic: test auto-offset-reset: earliest @@ -21,12 +21,13 @@ management: enabled: true info: kafka: - bootstrap-server: ${juplo.consumer.bootstrap-server} - client-id: ${juplo.consumer.client-id} - group-id: ${juplo.consumer.group-id} - topic: ${juplo.consumer.topic} - auto-offset-reset: ${juplo.consumer.auto-offset-reset} - auto-commit-interval: ${juplo.consumer.auto-commit-interval} + bootstrap-server: ${juplo.bootstrap-server} + client-id: ${juplo.client-id} + consumer: + group-id: ${juplo.consumer.group-id} + topic: ${juplo.consumer.topic} + auto-offset-reset: ${juplo.consumer.auto-offset-reset} + auto-commit-interval: ${juplo.consumer.auto-commit-interval} logging: level: root: INFO -- 2.20.1