public class ExampleConsumer
{
private final String id;
- private final String topic;
+ private final String[] topics;
private final Consumer<String, String> consumer;
private volatile boolean running = false;
private long consumed = 0;
- public ExampleConsumer(String broker, String topic, String groupId, String clientId)
+ public ExampleConsumer(
+ String broker,
+ String groupId,
+ String clientId,
+ String... topics)
{
Properties props = new Properties();
props.put("bootstrap.servers", broker);
props.put("metadata.maxage.ms", 5000);
this.id = clientId;
- this.topic = topic;
+ this.topics = topics;
consumer = new KafkaConsumer<>(props);
}
{
try
{
- log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic));
+ log.info("{} - Subscribing to topics: {}", id, topics);
+ consumer.subscribe(Arrays.asList(topics));
running = true;
while (true)
public static void main(String[] args) throws Exception
{
String broker = ":9092";
- String topic = "test";
+ String[] topics = new String[] { "test" };
String groupId = "my-group";
String clientId = "DEV";
case 3:
groupId = args[2];
case 2:
- topic = args[1];
+ topics = args[1].split(",");
case 1:
broker = args[0];
}
- ExampleConsumer instance = new ExampleConsumer(broker, topic, groupId, clientId);
+ ExampleConsumer instance = new ExampleConsumer(broker, groupId, clientId, topics);
Runtime.getRuntime().addShutdownHook(new Thread(() ->
{
}));
log.info(
- "Running SimpleConsumer: broker={}, topic={}, group-id={}, client-id={}",
+ "Running ExampleConsumer: broker={}, topic={}, group-id={}, client-id={}",
broker,
- topic,
groupId,
- clientId);
+ clientId,
+ topics);
instance.run();
}
}