From: Kai Moritz Date: Tue, 1 Apr 2025 21:07:38 +0000 (+0200) Subject: Vorlage für das Live-Coding zum `simple-consumer` X-Git-Tag: grundlagen/simple-consumer--livecoding--2025-04-signal-spickzettel X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=8b523af7caf98711fe2d2eba26babd949f400176;p=demos%2Fkafka%2Ftraining Vorlage für das Live-Coding zum `simple-consumer` --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 7be827e..b02d502 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -16,19 +16,19 @@ import java.util.Properties; @Slf4j public class ExampleConsumer { - private final String id; - private final String topic; - private final Consumer consumer; + private String id; + private String topic; + private Consumer consumer; - private volatile boolean running = false; private long consumed = 0; - public ExampleConsumer( - String broker, - String topic, - String groupId, - String clientId) + public static void main(String[] args) throws Exception { + String broker = "localhost:9092"; + String topic = "test"; + String groupId = "my-group"; + String clientId = "DEV"; + Properties props = new Properties(); props.put("bootstrap.servers", broker); props.put("group.id", groupId); // ID für die Offset-Commits @@ -36,28 +36,26 @@ public class ExampleConsumer props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); - this.id = clientId; - this.topic = topic; - consumer = new KafkaConsumer<>(props); - } + Consumer consumer = new KafkaConsumer<>(props); + String id = clientId; + long consumed = 0; - public void run() - { try { log.info("{} - Subscribing to topic {}", id, topic); consumer.subscribe(Arrays.asList(topic)); - running = true; while (true) { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - log.info("{} - Received {} messages", id, records.count()); for (ConsumerRecord record : records) { - handleRecord( + consumed++; + log.info( + "{} - partition={}-{}, offset={}: {}={}", + id, record.topic(), record.partition(), record.offset(), @@ -66,76 +64,10 @@ public class ExampleConsumer } } } - catch(WakeupException e) - { - log.info("{} - Consumer was signaled to finish its work", id); - } - catch(Exception e) - { - log.error("{} - Unexpected error, unsubscribing!", id, e); - consumer.unsubscribe(); - } finally { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - running = false; } } - - private void handleRecord( - String topic, - Integer partition, - Long offset, - String key, - String value) - { - consumed++; - log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); - } - - - public static void main(String[] args) throws Exception - { - if (args.length != 4) - { - log.error("Four arguments required!"); - log.error("args[0]: Broker-Address"); - log.error("args[1]: Topic"); - log.error("args[2]: Group-ID"); - log.error("args[3]: Unique Client-ID"); - System.exit(1); - return; - } - - - log.info( - "Running ExampleConsumer: broker={}, topic={}, group-id={}, client-id={}", - args[0], - args[1], - args[2], - args[3]); - - ExampleConsumer instance = new ExampleConsumer(args[0], args[1], args[2], args[3]); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - instance.consumer.wakeup(); - - while (instance.running) - { - log.info("Waiting for main-thread..."); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) {} - } - log.info("Shutdown completed."); - })); - - instance.run(); - } }