From: Kai Moritz Date: Tue, 1 Apr 2025 17:51:54 +0000 (+0200) Subject: Vorlage für das Live-Coding zum `simple-producer` X-Git-Tag: grundlagen/simple-producer--livecoding--2026-03-22--20-47 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=396a0b09f477cfec967831f4c503678f5ad50081;p=demos%2Fkafka%2Ftraining Vorlage für das Live-Coding zum `simple-producer` --- diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index a7645889..914a163b 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -12,143 +12,46 @@ import java.util.Properties; @Slf4j public class ExampleProducer { - private final String id; - private final String topic; - private final Producer producer; - private volatile boolean running = true; private volatile boolean done = false; private long produced = 0; - public ExampleProducer( - String broker, - String topic, - String clientId) + public static void main(String[] args) throws Exception { + String broker = "localhost:9092"; + String topic = "test"; + String clientId = "DEV"; + Properties props = new Properties(); props.put("bootstrap.servers", broker); props.put("client.id", clientId); // Nur zur Wiedererkennung props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); - this.id = clientId; - this.topic = topic; - producer = new KafkaProducer<>(props); - } + Producer producer = new KafkaProducer<>(props); - public void run() - { - long i = 0; + String id = clientId; try { - for (; running; i++) - { - send(Long.toString(i%10), Long.toString(i)); - Thread.sleep(500); - } - } - catch (Exception e) - { - log.error("{} - Unexpected error!", id, e); - } - finally - { - log.info("{}: Closing the KafkaProducer", id); - producer.close(); - log.info("{}: Produced {} messages in total, exiting!", id, produced); - done = true; - } - } - - void send(String key, String value) - { - final long sendRequested = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long sendRequestProcessed = System.currentTimeMillis(); - if (e == null) + for (long i = 0; true; i++) { - // HANDLE SUCCESS - log.debug( - "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms", - id, - key, - value, - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - sendRequestProcessed - sendRequested + final ProducerRecord record = new ProducerRecord<>( + topic, // Topic + Long.toString(i%10), // Key + Long.toString(i) // Value ); - } - else - { - // HANDLE ERROR - log.error( - "{} - ERROR for message {}={}, latency={}ms: {}", - id, - key, - value, - sendRequestProcessed - sendRequested, - e.toString() - ); - } - }); - long sendRequestQueued = System.currentTimeMillis(); - produced++; - log.trace( - "{} - Queued message {}={}, latency={}ms", - id, - key, - value, - sendRequestQueued - sendRequested - ); - } + producer.send(record); + log.debug("{} - Sent message {}={}", id, record.key(), record.value()); - public static void main(String[] args) throws Exception - { - if (args.length != 3) - { - log.error("Three arguments required!"); - log.error("arg[0]: Broker-Address"); - log.error("arg[1]: Topic"); - log.error("arg[2]: Unique Client-ID"); - System.exit(1); - return; + Thread.sleep(500); + } } - - log.info( - "Running ExampleProducer: broker={}, topic={}, client-id={}", - args[0], - args[1], - args[2]); - - ExampleProducer instance = new ExampleProducer(args[0], args[1], args[2]); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> + finally { - instance.running = false; - while (!instance.done) - { - log.info("{} - Waiting for main-thread...", instance.id); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) {} - } - log.info("{} - Shutdown completed.", instance.id); - })); - - instance.run(); + log.info("{}: Exiting!", id); + } } }