]> juplo.de Git - demos/kafka/training/commitdiff
Audit-Headers für den `ExampleProducer` implementiert producer/spring-producer--auditheaders producer/spring-producer--auditheaders--2026-03-22--22-01
authorKai Moritz <kai@juplo.de>
Fri, 1 Nov 2024 08:55:43 +0000 (09:55 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 22 Mar 2026 20:05:53 +0000 (21:05 +0100)
README.sh
docker/docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ExampleProducer.java

index 19419b57f60fb3575cf00fed14f088f6cb72221c..2ad543ee4edac8450d49ccb4ea1b936dc48eb617 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/spring-producer:1.0-backpressure-SNAPSHOT
+IMAGE=juplo/spring-producer:1.0-auditheaders-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -10,7 +10,7 @@ then
 fi
 
 docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf setup producer
+docker compose -f docker/docker-compose.yml rm -svf setup peter ute
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
@@ -26,18 +26,14 @@ fi
 docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1
 
 
-docker compose -f docker/docker-compose.yml up -d producer
-sleep 5
+docker compose -f docker/docker-compose.yml up -d peter ute
+sleep 10
 
-docker compose -f docker/docker-compose.yml pause kafka-1 kafka-3
-sleep 20
-docker compose -f docker/docker-compose.yml unpause kafka-1 kafka-3
-sleep 3
-
-docker compose -f docker/docker-compose.yml stop producer
 
 echo
 echo "Empfangen:"
-docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -q -J
+docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -c 100
 
-docker compose -f docker/docker-compose.yml logs producer | grep ERROR
+echo
+echo "Empfangen (mit Headern!):"
+docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -c 100 -J
index 0708bf07916fb5879158dacf8ecc28152cf1740b..fa8bba8c3cca2958ee18f4f37171d5c343237998 100644 (file)
@@ -173,14 +173,41 @@ services:
       - kafka-3
 
   producer:
-    image: juplo/spring-producer:1.0-backpressure-SNAPSHOT
+    image: juplo/spring-producer:1.0-auditheaders-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: producer
       juplo.producer.topic: test
       juplo.producer.delivery-timeout: 2147483647ms
-      juplo.producer.max-block: 2147483647ms
-      juplo.producer.max-queue-length: 10
+      juplo.producer.max-queue-length: 100
+    cpu_period: 100000
+    cpu_quota:  50000
+    mem_limit:  100m
+
+  foo:
+    image: juplo/spring-producer:1.0-auditheaders-SNAPSHOT
+    environment:
+      juplo.bootstrap-server: kafka:9092
+      juplo.client-id: foo
+      juplo.producer.topic: test
+      juplo.producer.linger: 300ms
+      juplo.producer.throttle: 66ms
+      juplo.producer.delivery-timeout: 2147483647ms
+      juplo.producer.max-queue-length: 100
+    cpu_period: 100000
+    cpu_quota:  50000
+    mem_limit:  100m
+
+  bar:
+    image: juplo/spring-producer:1.0-auditheaders-SNAPSHOT
+    environment:
+      juplo.bootstrap-server: kafka:9092
+      juplo.client-id: bar
+      juplo.producer.topic: test
+      juplo.producer.lingers: 500ms
+      juplo.producer.throttle: 30ms
+      juplo.producer.delivery-timeout: 2147483647ms
+      juplo.producer.max-queue-length: 50
     cpu_period: 100000
     cpu_quota:  50000
     mem_limit:  100m
diff --git a/pom.xml b/pom.xml
index 2f6a27ce33f5af8cfbc3b871696337a3d9770a68..8a85c0cbb522e5651fcd9812831d9bf05a4cc7f6 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
   <artifactId>spring-producer</artifactId>
   <name>Spring Producer</name>
   <description>A Simple Producer, based on Spring Boot, that sends messages via Kafka</description>
-  <version>1.0-backpressure-SNAPSHOT</version>
+  <version>1.0-auditheaders-SNAPSHOT</version>
 
   <properties>
     <java.version>21</java.version>
index dc6c1d17bf28e2ead0a9e592a40368f44a6e3051..25b45777f9a1600a33181f95b9dc52b2bf71e64f 100644 (file)
@@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
+import java.math.BigInteger;
 import java.time.Duration;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -11,6 +12,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 @Slf4j
 public class ExampleProducer implements Runnable
 {
+  public final static String HEADER_ID = "id";
+  public final static String HEADER_QUEUED = "#";
+
   private final String id;
   private final String topic;
   private final Duration throttle;
@@ -107,6 +111,11 @@ public class ExampleProducer implements Runnable
 
     int queuedAfterSend = queued.incrementAndGet();
 
+    record
+      .headers()
+      .add(HEADER_ID, id.getBytes())
+      .add(HEADER_QUEUED, BigInteger.valueOf(queuedAfterSend).toByteArray());
+
     producer.send(record, (metadata, e) ->
     {
       long sendRequestProcessed = System.currentTimeMillis();