From 25a3ac66cab49ebe30e2124a44625b56afb161ff Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 14 Dec 2021 20:13:41 +0100 Subject: [PATCH] Headers: a producer with audit-headers --- README.sh | 10 +++++++--- docker-compose.yml | 10 ++++++++++ src/main/java/de/juplo/kafka/EndlessProducer.java | 7 +++++++ 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/README.sh b/README.sh index 0544297..de08fae 100755 --- a/README.sh +++ b/README.sh @@ -25,7 +25,11 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec kafka cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 docker-compose up setup -docker-compose up -d producer +docker-compose up -d producer peter sleep 5 -docker-compose stop producer -docker-compose logs producer +docker-compose stop producer peter +docker-compose exec -T cli bash << 'EOF' +# tag::kcat[] +kafkacat -C -b kafka:9092 -t test -o beginning -f'key: %k, headers: %h, value: %s\n' -e +# end::kcat[] +EOF diff --git a/docker-compose.yml b/docker-compose.yml index 10ad3a0..6f38efc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -45,3 +45,13 @@ services: producer.client-id: producer producer.topic: test producer.throttle-ms: 200 + + peter: + image: juplo/endless-producer:1.0-SNAPSHOT + ports: + - 8081:8080 + environment: + producer.bootstrap-server: kafka:9092 + producer.client-id: peter + producer.topic: test + producer.throttle-ms: 666 diff --git a/src/main/java/de/juplo/kafka/EndlessProducer.java b/src/main/java/de/juplo/kafka/EndlessProducer.java index 43b0e41..1620693 100644 --- a/src/main/java/de/juplo/kafka/EndlessProducer.java +++ b/src/main/java/de/juplo/kafka/EndlessProducer.java @@ -6,6 +6,8 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import javax.annotation.PreDestroy; +import java.math.BigDecimal; +import java.math.BigInteger; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -66,6 +68,11 @@ public class EndlessProducer implements Runnable Long.toString(i) // Value ); + record + .headers() + .add("client", id.getBytes()) + .add("i", BigInteger.valueOf(i).toByteArray()); // Printed as String by kafkacat + producer.send(record, (metadata, e) -> { long now = System.currentTimeMillis(); -- 2.20.1