- String broker = ":9092";
- String topic = "test";
- String groupId = "my-group";
- String clientId = "DEV";
-
- switch (args.length)
- {
- case 4:
- clientId = args[3];
- case 3:
- groupId = args[2];
- case 2:
- topic = args[1];
- case 1:
- broker = args[0];
- }
-
-
- SimpleConsumer instance = new SimpleConsumer(broker, topic, groupId, clientId);
-
- Runtime.getRuntime().addShutdownHook(new Thread(() ->
- {
- instance.consumer.wakeup();
-
- while (instance.running)
- {
- log.info("Waiting for main-thread...");
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e) {}
- }
- log.info("Shutdown completed.");
- }));
-
- log.info(
- "Running SimpleConsumer: broker={}, topic={}, group-id={}, client-id={}",
- broker,
- topic,
- groupId,
- clientId);
- instance.run();
+ consumed++;
+ log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
+ messageHandler.handle(key, value);