From: Kai Moritz Date: Sun, 29 Sep 2024 12:17:07 +0000 (+0200) Subject: Der Consumer kann mit mehreren Topics konfiguriert werden X-Git-Tag: consumer/spring-consumer--REBASE-ANFANG~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=bf67d48c902d9289aa786ed5b1f6e083a6e67c19;p=demos%2Fkafka%2Ftraining Der Consumer kann mit mehreren Topics konfiguriert werden --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 710f45a..98830ba 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -17,13 +17,17 @@ import java.util.Properties; public class ExampleConsumer { private final String id; - private final String topic; + private final String[] topics; private final Consumer consumer; private volatile boolean running = false; private long consumed = 0; - public ExampleConsumer(String broker, String topic, String groupId, String clientId) + public ExampleConsumer( + String broker, + String groupId, + String clientId, + String... topics) { Properties props = new Properties(); props.put("bootstrap.servers", broker); @@ -36,7 +40,7 @@ public class ExampleConsumer props.put("metadata.maxage.ms", 5000); this.id = clientId; - this.topic = topic; + this.topics = topics; consumer = new KafkaConsumer<>(props); } @@ -45,8 +49,8 @@ public class ExampleConsumer { try { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + log.info("{} - Subscribing to topics: {}", id, topics); + consumer.subscribe(Arrays.asList(topics)); running = true; while (true) @@ -92,7 +96,7 @@ public class ExampleConsumer public static void main(String[] args) throws Exception { String broker = ":9092"; - String topic = "test"; + String[] topics = new String[] { "test" }; String groupId = "my-group"; String clientId = "DEV"; @@ -103,13 +107,13 @@ public class ExampleConsumer case 3: groupId = args[2]; case 2: - topic = args[1]; + topics = args[1].split(","); case 1: broker = args[0]; } - ExampleConsumer instance = new ExampleConsumer(broker, topic, groupId, clientId); + ExampleConsumer instance = new ExampleConsumer(broker, groupId, clientId, topics); Runtime.getRuntime().addShutdownHook(new Thread(() -> { @@ -128,11 +132,11 @@ public class ExampleConsumer })); log.info( - "Running SimpleConsumer: broker={}, topic={}, group-id={}, client-id={}", + "Running ExampleConsumer: broker={}, topic={}, group-id={}, client-id={}", broker, - topic, groupId, - clientId); + clientId, + topics); instance.run(); } }