Producerspezifische Properties werden in eigener nested Class verwaltet
authorKai Moritz <kai@juplo.de>
Mon, 28 Oct 2024 08:27:04 +0000 (09:27 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 29 Oct 2024 17:06:38 +0000 (18:06 +0100)
* Dadurch wird der Code übersichtlicher, wenn spätere Implementierungen
  _sowohl_ als Consumer, _als auch_ als Producer agieren!

docker/docker-compose.yml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

index 5807af0..c417a7f 100644 (file)
@@ -192,8 +192,8 @@ services:
   producer:
     image: juplo/spring-producer:1.0-SNAPSHOT
     environment:
-      juplo.producer.bootstrap-server: kafka:9092
-      juplo.producer.client-id: producer
+      juplo.bootstrap-server: kafka:9092
+      juplo.client-id: producer
       juplo.producer.topic: test
 
   consumer-1:
index 776c737..2491f09 100644 (file)
@@ -22,7 +22,7 @@ public class ApplicationConfiguration
     return
         new ExampleProducer(
             properties.getClientId(),
-            properties.getTopic(),
+            properties.getProducerProperties().getTopic(),
             kafkaProducer);
   }
 
@@ -32,13 +32,13 @@ public class ApplicationConfiguration
     Properties props = new Properties();
     props.put("bootstrap.servers", properties.getBootstrapServer());
     props.put("client.id", properties.getClientId());
-    props.put("acks", properties.getAcks());
-    props.put("batch.size", properties.getBatchSize());
+    props.put("acks", properties.getProducerProperties().getAcks());
+    props.put("batch.size", properties.getProducerProperties().getBatchSize());
     props.put("metadata.maxage.ms",   5000); //  5 Sekunden
     props.put("delivery.timeout.ms", 20000); // 20 Sekunden
     props.put("request.timeout.ms",  10000); // 10 Sekunden
-    props.put("linger.ms", properties.getLingerMs());
-    props.put("compression.type", properties.getCompressionType());
+    props.put("linger.ms", properties.getProducerProperties().getLingerMs());
+    props.put("compression.type", properties.getProducerProperties().getCompressionType());
     props.put("key.serializer", StringSerializer.class.getName());
     props.put("value.serializer", StringSerializer.class.getName());
 
index f33887c..1f83246 100644 (file)
@@ -8,7 +8,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.validation.annotation.Validated;
 
 
-@ConfigurationProperties(prefix = "juplo.producer")
+@ConfigurationProperties(prefix = "juplo")
 @Validated
 @Getter
 @Setter
@@ -20,17 +20,34 @@ public class ApplicationProperties
   @NotNull
   @NotEmpty
   private String clientId;
+
   @NotNull
-  @NotEmpty
-  private String topic;
-  @NotNull
-  @NotEmpty
-  private String acks;
-  @NotNull
-  private Integer batchSize;
-  @NotNull
-  private Integer lingerMs;
-  @NotNull
-  @NotEmpty
-  private String compressionType;
+  private ProducerProperties producer;
+
+
+  public ProducerProperties getProducerProperties()
+  {
+    return producer;
+  }
+
+
+  @Validated
+  @Getter
+  @Setter
+  static class ProducerProperties
+  {
+    @NotNull
+    @NotEmpty
+    private String topic;
+    @NotNull
+    @NotEmpty
+    private String acks;
+    @NotNull
+    private Integer batchSize;
+    @NotNull
+    private Integer lingerMs;
+    @NotNull
+    @NotEmpty
+    private String compressionType;
+  }
 }
index 6c2c08c..9785648 100644 (file)
@@ -1,7 +1,7 @@
 juplo:
+  bootstrap-server: :9092
+  client-id: DEV
   producer:
-    bootstrap-server: :9092
-    client-id: DEV
     topic: test
     acks: -1
     batch-size: 16384
@@ -22,13 +22,14 @@ management:
       enabled: true
 info:
   kafka:
-    bootstrap-server: ${juplo.producer.bootstrap-server}
-    client-id: ${juplo.producer.client-id}
-    topic: ${juplo.producer.topic}
-    acks: ${juplo.producer.acks}
-    batch-size: ${juplo.producer.batch-size}
-    linger-ms: ${juplo.producer.linger-ms}
-    compression-type: ${juplo.producer.compression-type}
+    bootstrap-server: ${juplo.bootstrap-server}
+    client-id: ${juplo.client-id}
+    producer:
+      topic: ${juplo.producer.topic}
+      acks: ${juplo.producer.acks}
+      batch-size: ${juplo.producer.batch-size}
+      linger-ms: ${juplo.producer.linger-ms}
+      compression-type: ${juplo.producer.compression-type}
 logging:
   level:
     root: INFO
index 6d07e5a..fe8609e 100644 (file)
@@ -29,7 +29,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
     properties = {
         "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
         "spring.kafka.consumer.auto-offset-reset=earliest",
-        "juplo.producer.bootstrap-server=${spring.embedded.kafka.brokers}",
+        "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
         "juplo.producer.topic=" + TOPIC})
 @AutoConfigureMockMvc
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)