From: Kai Moritz Date: Fri, 1 Nov 2024 08:55:43 +0000 (+0100) Subject: Audit-Headers für den `ExampleProducer` implementiert X-Git-Tag: producer/simple-producer--auditheaders--2026-03--vor-branchumbenennung--springframework X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=0940d78650a6d12a6f0aac878749553902c3737f;p=demos%2Fkafka%2Ftraining Audit-Headers für den `ExampleProducer` implementiert --- diff --git a/README.sh b/README.sh index 3d98ace7..7cec701b 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/simple-producer:1.0-SNAPSHOT +IMAGE=juplo/simple-producer:1.0-auditheaders-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -10,7 +10,7 @@ then fi docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf producer +docker compose -f docker/docker-compose.yml rm -svf setup peter ute if [[ $(docker image ls -q $IMAGE) == "" || @@ -26,11 +26,14 @@ fi docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 -docker compose -f docker/docker-compose.yml up -d producer -sleep 5 +docker compose -f docker/docker-compose.yml up -d peter ute +sleep 10 -docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -c 20 -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n' -docker compose -f docker/docker-compose.yml stop producer -docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n' -docker compose -f docker/docker-compose.yml logs producer +echo +echo "Empfangen:" +docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -c 100 + +echo +echo "Empfangen (mit Headern!):" +docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -c 100 -J diff --git a/build.gradle b/build.gradle index 0119074d..753852eb 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.0-SNAPSHOT' +version = '1.0-auditheaders-SNAPSHOT' java { toolchain { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 2ab5b6cd..4f042228 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -173,12 +173,32 @@ services: - kafka-3 producer: - image: juplo/simple-producer:1.0-SNAPSHOT + image: juplo/simple-producer:1.0-auditheaders-SNAPSHOT command: kafka:9092 test producer cpu_period: 100000 cpu_quota: 50000 mem_limit: 100m + foo: + image: juplo/simple-producer:1.0-auditheaders-SNAPSHOT + command: kafka:9092 test foo + + bar: + image: juplo/simple-producer:1.0-auditheaders-SNAPSHOT + command: kafka:9092 test bar + + consumer: + image: juplo/simple-consumer:1.0-SNAPSHOT + command: kafka:9092 test my-group consumer + + peter: + image: juplo/simple-consumer:1.0-SNAPSHOT + command: kafka:9092 test my-group peter + + ute: + image: juplo/simple-consumer:1.0-SNAPSHOT + command: kafka:9092 test my-group ute + volumes: controller-data: kafka-1-data: diff --git a/pom.xml b/pom.xml index 2a04e375..eb309f71 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ simple-producer Super Simple Producer A Simple Producer, programmed with pure Java, that sends messages via Kafka - 1.0-SNAPSHOT + 1.0-auditheaders-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 60424bc3..580c060d 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -6,12 +6,16 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; +import java.math.BigInteger; import java.util.Properties; @Slf4j public class ExampleProducer { + public final static String HEADER_ID = "id"; + public final static String HEADER_PRODUCED = "#"; + private final String id; private final String topic; private final Producer producer; @@ -71,6 +75,11 @@ public class ExampleProducer value // Value ); + record + .headers() + .add(HEADER_ID, id.getBytes()) + .add(HEADER_PRODUCED, BigInteger.valueOf(produced).toByteArray()); + producer.send(record, (metadata, e) -> { long sendRequestProcessed = System.currentTimeMillis();