From bf67d48c902d9289aa786ed5b1f6e083a6e67c19 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 29 Sep 2024 14:17:07 +0200 Subject: [PATCH] Der Consumer kann mit mehreren Topics konfiguriert werden --- .../java/de/juplo/kafka/ExampleConsumer.java | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 710f45a..98830ba 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -17,13 +17,17 @@ import java.util.Properties; public class ExampleConsumer { private final String id; - private final String topic; + private final String[] topics; private final Consumer consumer; private volatile boolean running = false; private long consumed = 0; - public ExampleConsumer(String broker, String topic, String groupId, String clientId) + public ExampleConsumer( + String broker, + String groupId, + String clientId, + String... topics) { Properties props = new Properties(); props.put("bootstrap.servers", broker); @@ -36,7 +40,7 @@ public class ExampleConsumer props.put("metadata.maxage.ms", 5000); this.id = clientId; - this.topic = topic; + this.topics = topics; consumer = new KafkaConsumer<>(props); } @@ -45,8 +49,8 @@ public class ExampleConsumer { try { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + log.info("{} - Subscribing to topics: {}", id, topics); + consumer.subscribe(Arrays.asList(topics)); running = true; while (true) @@ -92,7 +96,7 @@ public class ExampleConsumer public static void main(String[] args) throws Exception { String broker = ":9092"; - String topic = "test"; + String[] topics = new String[] { "test" }; String groupId = "my-group"; String clientId = "DEV"; @@ -103,13 +107,13 @@ public class ExampleConsumer case 3: groupId = args[2]; case 2: - topic = args[1]; + topics = args[1].split(","); case 1: broker = args[0]; } - ExampleConsumer instance = new ExampleConsumer(broker, topic, groupId, clientId); + ExampleConsumer instance = new ExampleConsumer(broker, groupId, clientId, topics); Runtime.getRuntime().addShutdownHook(new Thread(() -> { @@ -128,11 +132,11 @@ public class ExampleConsumer })); log.info( - "Running SimpleConsumer: broker={}, topic={}, group-id={}, client-id={}", + "Running ExampleConsumer: broker={}, topic={}, group-id={}, client-id={}", broker, - topic, groupId, - clientId); + clientId, + topics); instance.run(); } } -- 2.20.1