Headers: a producer with audit-headers
authorKai Moritz <kai@juplo.de>
Tue, 14 Dec 2021 19:13:41 +0000 (20:13 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 14 Dec 2021 19:39:43 +0000 (20:39 +0100)
README.sh
docker-compose.yml
src/main/java/de/juplo/kafka/EndlessProducer.java

index 0544297..de08fae 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -25,7 +25,11 @@ fi
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec kafka cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
 docker-compose up setup
-docker-compose up -d producer
+docker-compose up -d producer peter
 sleep 5
-docker-compose stop producer
-docker-compose logs producer
+docker-compose stop producer peter
+docker-compose exec -T cli bash << 'EOF'
+# tag::kcat[]
+kafkacat -C -b kafka:9092 -t test -o beginning -f'key: %k, headers: %h, value: %s\n' -e
+# end::kcat[]
+EOF
index 10ad3a0..6f38efc 100644 (file)
@@ -45,3 +45,13 @@ services:
       producer.client-id: producer
       producer.topic: test
       producer.throttle-ms: 200
+
+  peter:
+    image: juplo/endless-producer:1.0-SNAPSHOT
+    ports:
+      - 8081:8080
+    environment:
+      producer.bootstrap-server: kafka:9092
+      producer.client-id: peter
+      producer.topic: test
+      producer.throttle-ms: 666
index 43b0e41..1620693 100644 (file)
@@ -6,6 +6,8 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import javax.annotation.PreDestroy;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -66,6 +68,11 @@ public class EndlessProducer implements Runnable
             Long.toString(i)       // Value
         );
 
+        record
+            .headers()
+            .add("client", id.getBytes())
+            .add("i", BigInteger.valueOf(i).toByteArray()); // Printed as String by kafkacat
+
         producer.send(record, (metadata, e) ->
         {
           long now = System.currentTimeMillis();