From fd38f05b25d078a8c5daa870142f20092015f05f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 1 Apr 2025 19:51:54 +0200 Subject: [PATCH] =?utf8?q?Vorlage=20f=C3=BCr=20das=20Live-Coding=20zum=20`?= =?utf8?q?simple-producer`?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ExampleProducer.java | 132 +++--------------- 1 file changed, 20 insertions(+), 112 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 0bab4426..41eedb81 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -12,143 +12,51 @@ import java.util.Properties; @Slf4j public class ExampleProducer { - private final String id; - private final String topic; - private final Producer producer; + private String id; + private String topic; + private Producer producer; private volatile boolean running = true; private volatile boolean done = false; private long produced = 0; - public ExampleProducer( - String broker, - String topic, - String clientId) + public static void main(String[] args) throws Exception { + String broker = "localhost:9092"; + String clientId = "DEV"; + String topic = "test"; + Properties props = new Properties(); props.put("bootstrap.servers", broker); props.put("client.id", clientId); // Nur zur Wiedererkennung props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); - this.id = clientId; - this.topic = topic; - producer = new KafkaProducer<>(props); - } + Producer producer = new KafkaProducer<>(props); - public void run() - { + String id = clientId; long i = 0; + boolean running = true; + boolean done = false; + long produced = 0; try { for (; running; i++) { - send(Long.toString(i%10), Long.toString(i)); + final ProducerRecord record = new ProducerRecord<>( + topic, // Topic + Long.toString(i%10), // Key + Long.toString(i) // Value + ); + + producer.send(record); Thread.sleep(500); } } - catch (Exception e) - { - log.error("{} - Unexpected error!", id, e); - } finally { - log.info("{}: Closing the KafkaProducer", id); - producer.close(); log.info("{}: Produced {} messages in total, exiting!", id, produced); - done = true; } } - - void send(String key, String value) - { - final long time = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - log.debug( - "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms", - id, - key, - value, - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - log.error( - "{} - ERROR for message {}={}, latency={}ms: {}", - id, - key, - value, - now - time, - e.toString() - ); - } - }); - - long now = System.currentTimeMillis(); - produced++; - log.trace( - "{} - Queued message {}={}, latency={}ms", - id, - key, - value, - now - time - ); - } - - - public static void main(String[] args) throws Exception - { - if (args.length != 3) - { - log.error("Three arguments required!"); - log.error("arg[0]: Broker-Address"); - log.error("arg[1]: Topic"); - log.error("arg[2]: Unique Client-ID"); - System.exit(1); - return; - } - - log.info( - "Running ExampleProducer: broker={}, topic={}, client-id={}", - args[0], - args[1], - args[2]); - - ExampleProducer instance = new ExampleProducer(args[0], args[1], args[2]); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - instance.running = false; - while (!instance.done) - { - log.info("Waiting for main-thread..."); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) {} - } - log.info("Shutdown completed."); - })); - - instance.run(); - } } -- 2.20.1