]> juplo.de Git - demos/kafka/training/commitdiff
Audit-Headers für den `ExampleProducer` implementiert producer/simple-producer--auditheaders--2026-03--vor-branchumbenennung--springframework producer/simple-producer--auditheaders--2026-03-20 producer/simple-producer--auditheaders--2026-03-20--19-06 producer/simple-producer--auditheaders--2026-03-21--smartlifecycle-only producer/simple-producer--auditheaders--2026-03-22--20-47
authorKai Moritz <kai@juplo.de>
Fri, 1 Nov 2024 08:55:43 +0000 (09:55 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 15 Mar 2026 18:02:24 +0000 (19:02 +0100)
README.sh
build.gradle
docker/docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ExampleProducer.java

index 3d98ace799653a8242fa2a3681e8936d7e3a3a78..7cec701be2167785e41ea6b87d72a80a374a2edd 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/simple-producer:1.0-SNAPSHOT
+IMAGE=juplo/simple-producer:1.0-auditheaders-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -10,7 +10,7 @@ then
 fi
 
 docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf producer
+docker compose -f docker/docker-compose.yml rm -svf setup peter ute
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
@@ -26,11 +26,14 @@ fi
 docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1
 
 
-docker compose -f docker/docker-compose.yml up -d producer
-sleep 5
+docker compose -f docker/docker-compose.yml up -d peter ute
+sleep 10
 
-docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -c 20 -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n'
 
-docker compose -f docker/docker-compose.yml stop producer
-docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n'
-docker compose -f docker/docker-compose.yml logs producer
+echo
+echo "Empfangen:"
+docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -c 100
+
+echo
+echo "Empfangen (mit Headern!):"
+docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -c 100 -J
index 0119074d23651b0cb94964e025cb98abcb8410f2..753852eb16ae5ddeccf0f64cbb972adafc6c5fc0 100644 (file)
@@ -8,7 +8,7 @@ plugins {
 }
 
 group = 'de.juplo.kafka'
-version = '1.0-SNAPSHOT'
+version = '1.0-auditheaders-SNAPSHOT'
 
 java {
        toolchain {
index 2ab5b6cdb6160be39004ce5504684bb2ca346b20..4f0422288eaaa8e15a4a8ed91d71f23c158c5130 100644 (file)
@@ -173,12 +173,32 @@ services:
       - kafka-3
 
   producer:
-    image: juplo/simple-producer:1.0-SNAPSHOT
+    image: juplo/simple-producer:1.0-auditheaders-SNAPSHOT
     command: kafka:9092 test producer
     cpu_period: 100000
     cpu_quota:  50000
     mem_limit:  100m
 
+  foo:
+    image: juplo/simple-producer:1.0-auditheaders-SNAPSHOT
+    command: kafka:9092 test foo
+
+  bar:
+    image: juplo/simple-producer:1.0-auditheaders-SNAPSHOT
+    command: kafka:9092 test bar
+
+  consumer:
+    image: juplo/simple-consumer:1.0-SNAPSHOT
+    command: kafka:9092 test my-group consumer
+
+  peter:
+    image: juplo/simple-consumer:1.0-SNAPSHOT
+    command: kafka:9092 test my-group peter
+
+  ute:
+    image: juplo/simple-consumer:1.0-SNAPSHOT
+    command: kafka:9092 test my-group ute
+
 volumes:
   controller-data:
   kafka-1-data:
diff --git a/pom.xml b/pom.xml
index 2a04e375eee86da46f03a64110650152d95fc1fa..eb309f710b72c7a991887e84946d1138a57481c4 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
   <artifactId>simple-producer</artifactId>
   <name>Super Simple Producer</name>
   <description>A Simple Producer, programmed with pure Java, that sends messages via Kafka</description>
-  <version>1.0-SNAPSHOT</version>
+  <version>1.0-auditheaders-SNAPSHOT</version>
 
   <properties>
     <java.version>21</java.version>
index 60424bc319ab2bbf0ba78b71a44e4fd5b4f0db54..580c060d889a83f75f51a74139dee7ddf6bb428c 100644 (file)
@@ -6,12 +6,16 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 
+import java.math.BigInteger;
 import java.util.Properties;
 
 
 @Slf4j
 public class ExampleProducer
 {
+  public final static String HEADER_ID = "id";
+  public final static String HEADER_PRODUCED = "#";
+
   private final String id;
   private final String topic;
   private final Producer<String, String> producer;
@@ -71,6 +75,11 @@ public class ExampleProducer
       value   // Value
     );
 
+    record
+      .headers()
+      .add(HEADER_ID, id.getBytes())
+      .add(HEADER_PRODUCED, BigInteger.valueOf(produced).toByteArray());
+
     producer.send(record, (metadata, e) ->
     {
       long sendRequestProcessed = System.currentTimeMillis();