Der Consumer kann mit mehreren Topics konfiguriert werden
authorKai Moritz <kai@juplo.de>
Sun, 29 Sep 2024 12:17:07 +0000 (14:17 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 29 Sep 2024 12:29:54 +0000 (14:29 +0200)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 710f45a..98830ba 100644 (file)
@@ -17,13 +17,17 @@ import java.util.Properties;
 public class ExampleConsumer
 {
   private final String id;
-  private final String topic;
+  private final String[] topics;
   private final Consumer<String, String> consumer;
 
   private volatile boolean running = false;
   private long consumed = 0;
 
-  public ExampleConsumer(String broker, String topic, String groupId, String clientId)
+  public ExampleConsumer(
+    String broker,
+    String groupId,
+    String clientId,
+    String... topics)
   {
     Properties props = new Properties();
     props.put("bootstrap.servers", broker);
@@ -36,7 +40,7 @@ public class ExampleConsumer
     props.put("metadata.maxage.ms", 5000);
 
     this.id = clientId;
-    this.topic = topic;
+    this.topics = topics;
     consumer = new KafkaConsumer<>(props);
   }
 
@@ -45,8 +49,8 @@ public class ExampleConsumer
   {
     try
     {
-      log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic));
+      log.info("{} - Subscribing to topics: {}", id, topics);
+      consumer.subscribe(Arrays.asList(topics));
       running = true;
 
       while (true)
@@ -92,7 +96,7 @@ public class ExampleConsumer
   public static void main(String[] args) throws Exception
   {
     String broker = ":9092";
-    String topic = "test";
+    String[] topics = new String[] { "test" };
     String groupId = "my-group";
     String clientId = "DEV";
 
@@ -103,13 +107,13 @@ public class ExampleConsumer
       case 3:
         groupId = args[2];
       case 2:
-        topic = args[1];
+        topics = args[1].split(",");
       case 1:
         broker = args[0];
     }
 
 
-    ExampleConsumer instance = new ExampleConsumer(broker, topic, groupId, clientId);
+    ExampleConsumer instance = new ExampleConsumer(broker, groupId, clientId, topics);
 
     Runtime.getRuntime().addShutdownHook(new Thread(() ->
     {
@@ -128,11 +132,11 @@ public class ExampleConsumer
     }));
 
     log.info(
-        "Running SimpleConsumer: broker={}, topic={}, group-id={}, client-id={}",
+        "Running ExampleConsumer: broker={}, topic={}, group-id={}, client-id={}",
         broker,
-        topic,
         groupId,
-        clientId);
+        clientId,
+        topics);
     instance.run();
   }
 }