From 396a0b09f477cfec967831f4c503678f5ad50081 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 1 Apr 2025 19:51:54 +0200 Subject: [PATCH] =?utf8?q?Vorlage=20f=C3=BCr=20das=20Live-Coding=20zum=20`?= =?utf8?q?simple-producer`?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ExampleProducer.java | 135 +++--------------- 1 file changed, 19 insertions(+), 116 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index a7645889..914a163b 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -12,143 +12,46 @@ import java.util.Properties; @Slf4j public class ExampleProducer { - private final String id; - private final String topic; - private final Producer producer; - private volatile boolean running = true; private volatile boolean done = false; private long produced = 0; - public ExampleProducer( - String broker, - String topic, - String clientId) + public static void main(String[] args) throws Exception { + String broker = "localhost:9092"; + String topic = "test"; + String clientId = "DEV"; + Properties props = new Properties(); props.put("bootstrap.servers", broker); props.put("client.id", clientId); // Nur zur Wiedererkennung props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); - this.id = clientId; - this.topic = topic; - producer = new KafkaProducer<>(props); - } + Producer producer = new KafkaProducer<>(props); - public void run() - { - long i = 0; + String id = clientId; try { - for (; running; i++) - { - send(Long.toString(i%10), Long.toString(i)); - Thread.sleep(500); - } - } - catch (Exception e) - { - log.error("{} - Unexpected error!", id, e); - } - finally - { - log.info("{}: Closing the KafkaProducer", id); - producer.close(); - log.info("{}: Produced {} messages in total, exiting!", id, produced); - done = true; - } - } - - void send(String key, String value) - { - final long sendRequested = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long sendRequestProcessed = System.currentTimeMillis(); - if (e == null) + for (long i = 0; true; i++) { - // HANDLE SUCCESS - log.debug( - "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms", - id, - key, - value, - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - sendRequestProcessed - sendRequested + final ProducerRecord record = new ProducerRecord<>( + topic, // Topic + Long.toString(i%10), // Key + Long.toString(i) // Value ); - } - else - { - // HANDLE ERROR - log.error( - "{} - ERROR for message {}={}, latency={}ms: {}", - id, - key, - value, - sendRequestProcessed - sendRequested, - e.toString() - ); - } - }); - long sendRequestQueued = System.currentTimeMillis(); - produced++; - log.trace( - "{} - Queued message {}={}, latency={}ms", - id, - key, - value, - sendRequestQueued - sendRequested - ); - } + producer.send(record); + log.debug("{} - Sent message {}={}", id, record.key(), record.value()); - public static void main(String[] args) throws Exception - { - if (args.length != 3) - { - log.error("Three arguments required!"); - log.error("arg[0]: Broker-Address"); - log.error("arg[1]: Topic"); - log.error("arg[2]: Unique Client-ID"); - System.exit(1); - return; + Thread.sleep(500); + } } - - log.info( - "Running ExampleProducer: broker={}, topic={}, client-id={}", - args[0], - args[1], - args[2]); - - ExampleProducer instance = new ExampleProducer(args[0], args[1], args[2]); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> + finally { - instance.running = false; - while (!instance.done) - { - log.info("{} - Waiting for main-thread...", instance.id); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) {} - } - log.info("{} - Shutdown completed.", instance.id); - })); - - instance.run(); + log.info("{}: Exiting!", id); + } } } -- 2.39.5