From: Kai Moritz Date: Fri, 1 Dec 2023 16:37:51 +0000 (+0100) Subject: Der Consumer kann mit mehreren Topics konfiguriert werden X-Git-Tag: consumer/spring-consumer--BRANCH-ENDE~15 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=48ae89d062eda2d83d1504e232017ad173330e6d;p=demos%2Fkafka%2Ftraining Der Consumer kann mit mehreren Topics konfiguriert werden --- diff --git a/README.sh b/README.sh index b32d9f5..5e265f9 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.0-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 715dd8e..a533c73 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -216,7 +216,7 @@ services: command: kafka:9092 test producer consumer: - image: juplo/spring-consumer:1.0-SNAPSHOT + image: juplo/spring-consumer:1.1-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: consumer diff --git a/pom.xml b/pom.xml index 13ded01..f161e17 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.0-SNAPSHOT + 1.1-SNAPSHOT 17 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 62d61a2..9a237b7 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -22,7 +22,7 @@ public class ApplicationConfiguration return new SimpleConsumer( kafkaProperties.getClientId(), - applicationProperties.getTopic(), + applicationProperties.getTopics(), kafkaConsumer); } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index a4cc8b8..4092fa2 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -17,5 +17,5 @@ public class ApplicationProperties { @NotNull @NotEmpty - private String topic; + private String[] topics; } diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index aadc11f..a76132b 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -17,7 +17,7 @@ import java.util.concurrent.Callable; public class SimpleConsumer implements Callable { private final String id; - private final String topic; + private final String[] topics; private final Consumer consumer; private long consumed = 0; @@ -28,8 +28,8 @@ public class SimpleConsumer implements Callable { try { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + log.info("{} - Subscribing to topics: {}", id, topics); + consumer.subscribe(Arrays.asList(topics)); while (true) { diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index b7fedad..50b114d 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,6 +1,6 @@ simple: consumer: - topic: test + topics: test management: endpoint: shutdown: @@ -19,7 +19,7 @@ info: bootstrap-server: ${spring.kafka.bootstrap-servers} client-id: ${spring.kafka.client-id} group-id: ${spring.kafka.consumer.group-id} - topic: ${simple.consumer.topic} + topics: ${simple.consumer.topics} auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset} spring: kafka: diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationIT.java index 1baca99..584773e 100644 --- a/src/test/java/de/juplo/kafka/ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/ApplicationIT.java @@ -14,7 +14,7 @@ import static de.juplo.kafka.ApplicationIT.TOPIC; webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "simple.consumer.topic=" + TOPIC }) + "simple.consumer.topics=" + TOPIC }) @EmbeddedKafka(topics = TOPIC) public class ApplicationIT {