From: Kai Moritz Date: Tue, 1 Apr 2025 17:51:54 +0000 (+0200) Subject: Vorlage für das Live-Coding zum `simple-producer` X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=fd38f05b25d078a8c5daa870142f20092015f05f;p=demos%2Fkafka%2Ftraining Vorlage für das Live-Coding zum `simple-producer` --- diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 0bab4426..41eedb81 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -12,143 +12,51 @@ import java.util.Properties; @Slf4j public class ExampleProducer { - private final String id; - private final String topic; - private final Producer producer; + private String id; + private String topic; + private Producer producer; private volatile boolean running = true; private volatile boolean done = false; private long produced = 0; - public ExampleProducer( - String broker, - String topic, - String clientId) + public static void main(String[] args) throws Exception { + String broker = "localhost:9092"; + String clientId = "DEV"; + String topic = "test"; + Properties props = new Properties(); props.put("bootstrap.servers", broker); props.put("client.id", clientId); // Nur zur Wiedererkennung props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); - this.id = clientId; - this.topic = topic; - producer = new KafkaProducer<>(props); - } + Producer producer = new KafkaProducer<>(props); - public void run() - { + String id = clientId; long i = 0; + boolean running = true; + boolean done = false; + long produced = 0; try { for (; running; i++) { - send(Long.toString(i%10), Long.toString(i)); + final ProducerRecord record = new ProducerRecord<>( + topic, // Topic + Long.toString(i%10), // Key + Long.toString(i) // Value + ); + + producer.send(record); Thread.sleep(500); } } - catch (Exception e) - { - log.error("{} - Unexpected error!", id, e); - } finally { - log.info("{}: Closing the KafkaProducer", id); - producer.close(); log.info("{}: Produced {} messages in total, exiting!", id, produced); - done = true; } } - - void send(String key, String value) - { - final long time = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - log.debug( - "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms", - id, - key, - value, - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - log.error( - "{} - ERROR for message {}={}, latency={}ms: {}", - id, - key, - value, - now - time, - e.toString() - ); - } - }); - - long now = System.currentTimeMillis(); - produced++; - log.trace( - "{} - Queued message {}={}, latency={}ms", - id, - key, - value, - now - time - ); - } - - - public static void main(String[] args) throws Exception - { - if (args.length != 3) - { - log.error("Three arguments required!"); - log.error("arg[0]: Broker-Address"); - log.error("arg[1]: Topic"); - log.error("arg[2]: Unique Client-ID"); - System.exit(1); - return; - } - - log.info( - "Running ExampleProducer: broker={}, topic={}, client-id={}", - args[0], - args[1], - args[2]); - - ExampleProducer instance = new ExampleProducer(args[0], args[1], args[2]); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - instance.running = false; - while (!instance.done) - { - log.info("Waiting for main-thread..."); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) {} - } - log.info("Shutdown completed."); - })); - - instance.run(); - } }