From 403e5c4aaf388f3108b63bea65a15bce20c37cf9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 15 Mar 2026 09:59:14 +0100 Subject: [PATCH] =?utf8?q?Vorlage=20(f=C3=BCr=20das=20Bauen=20des=20Produc?= =?utf8?q?ers=20als=20Docker-Image)?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 36 --------- .../java/de/juplo/kafka/ExampleProducer.java | 73 +++---------------- 2 files changed, 11 insertions(+), 98 deletions(-) delete mode 100755 README.sh diff --git a/README.sh b/README.sh deleted file mode 100755 index 3d98ace7..00000000 --- a/README.sh +++ /dev/null @@ -1,36 +0,0 @@ -#!/bin/bash - -IMAGE=juplo/simple-producer:1.0-SNAPSHOT - -if [ "$1" = "cleanup" ] -then - docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans - mvn clean - exit -fi - -docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf producer - -if [[ - $(docker image ls -q $IMAGE) == "" || - "$1" = "build" -]] -then - mvn clean install || exit -else - echo "Using image existing images:" - docker image ls $IMAGE -fi - -docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 - - -docker compose -f docker/docker-compose.yml up -d producer -sleep 5 - -docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -c 20 -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n' - -docker compose -f docker/docker-compose.yml stop producer -docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n' -docker compose -f docker/docker-compose.yml logs producer diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index a7645889..1cf50a6a 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -25,15 +25,15 @@ public class ExampleProducer String topic, String clientId) { - Properties props = new Properties(); - props.put("bootstrap.servers", broker); - props.put("client.id", clientId); // Nur zur Wiedererkennung - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); + // TODO: + // Instanziierung des KafkaProducer aus Ihrer bisherigen Implementierung + // Hier: + // - Den Parameter "broker" des Konstruktors für "bootstrap.servers" verwenden + // - Den Parameter "topic" des Konstruktors im Attribut "this.topic" merken + // - Den erzeugten KafkaProducer in "this.producer" ablegen this.id = clientId; this.topic = topic; - producer = new KafkaProducer<>(props); } public void run() @@ -44,8 +44,11 @@ public class ExampleProducer { for (; running; i++) { - send(Long.toString(i%10), Long.toString(i)); - Thread.sleep(500); + // Versenden der Nachrichten aus Ihrer bisherigen Implementierung + // Hier: + // - ACHTUNG: Schreiben Sie in das Topic "this.topic" + // - Ergänzen Sie mit "log.info()" eine Log-Meldung, die gesendete Nachrichten ausgibt + // - Zählen Sie außerdem die versendeten Nachrichten in "this.produced" } } catch (Exception e) @@ -54,65 +57,11 @@ public class ExampleProducer } finally { - log.info("{}: Closing the KafkaProducer", id); - producer.close(); log.info("{}: Produced {} messages in total, exiting!", id, produced); done = true; } } - void send(String key, String value) - { - final long sendRequested = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long sendRequestProcessed = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - log.debug( - "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms", - id, - key, - value, - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - sendRequestProcessed - sendRequested - ); - } - else - { - // HANDLE ERROR - log.error( - "{} - ERROR for message {}={}, latency={}ms: {}", - id, - key, - value, - sendRequestProcessed - sendRequested, - e.toString() - ); - } - }); - - long sendRequestQueued = System.currentTimeMillis(); - produced++; - log.trace( - "{} - Queued message {}={}, latency={}ms", - id, - key, - value, - sendRequestQueued - sendRequested - ); - } - public static void main(String[] args) throws Exception { -- 2.39.5