@Slf4j
public class ExampleConsumer
{
- private String id;
- private String topic;
- private Consumer<String, String> consumer;
+ private final String id;
+ private final String topic;
+ private final Consumer<String, String> consumer;
+ private volatile boolean running = false;
private long consumed = 0;
- public static void main(String[] args) throws Exception
- {
- String broker = "localhost:9092";
- String topic = "test";
- String groupId = "my-group";
- String clientId = "DEV";
+ public ExampleConsumer(
+ String broker,
+ String topic,
+ String groupId,
+ String clientId)
+ {
Properties props = new Properties();
props.put("bootstrap.servers", broker);
props.put("group.id", groupId); // ID für die Offset-Commits
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
- Consumer<String, String> consumer = new KafkaConsumer<>(props);
+ this.consumer = new KafkaConsumer<>(props);
+ this.id = clientId;
+ this.topic = topic;
+ }
+
- String id = clientId;
- long consumed = 0;
+ void run()
+ {
+ running = true;
+ consumed = 0;
try
{
log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
}
}
+
+
+ public static void main(String[] args) throws Exception
+ {
+ if (args.length !=4)
+ {
+ System.exit(1);
+ return;
+ }
+
+ ExampleConsumer instance = new ExampleConsumer(args[0], args[1], args[2], args[3]);
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() ->
+ {
+ while(instance.running)
+ {
+ instance.consumer.close();
+
+ try
+ {
+ log.info("{}: Waiting...", instance.id);
+ Thread.sleep(1000);
+ }
+ catch (Exception e) {}
+ }
+
+ log.info("{}: DONE!", instance.id);
+ }));
+
+ instance.run();
+ }
}