From: Kai Moritz Date: Fri, 1 Nov 2024 08:55:43 +0000 (+0100) Subject: Audit-Headers für den `ExampleProducer` implementiert X-Git-Tag: producer/simple-producer--auditheaders--2025-03-18--19-42 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=b382f70b7a889726bfc0c3443f441b8995ebf9c7;p=demos%2Fkafka%2Ftraining Audit-Headers für den `ExampleProducer` implementiert --- diff --git a/README.sh b/README.sh index 3d98ace7..7cec701b 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/simple-producer:1.0-SNAPSHOT +IMAGE=juplo/simple-producer:1.0-auditheaders-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -10,7 +10,7 @@ then fi docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf producer +docker compose -f docker/docker-compose.yml rm -svf setup peter ute if [[ $(docker image ls -q $IMAGE) == "" || @@ -26,11 +26,14 @@ fi docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 -docker compose -f docker/docker-compose.yml up -d producer -sleep 5 +docker compose -f docker/docker-compose.yml up -d peter ute +sleep 10 -docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -c 20 -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n' -docker compose -f docker/docker-compose.yml stop producer -docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n' -docker compose -f docker/docker-compose.yml logs producer +echo +echo "Empfangen:" +docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -c 100 + +echo +echo "Empfangen (mit Headern!):" +docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -c 100 -J diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 5b19de74..ece29538 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -136,9 +136,29 @@ services: - kafka-3 producer: - image: juplo/simple-producer:1.0-SNAPSHOT + image: juplo/simple-producer:1.0-auditheaders-SNAPSHOT command: kafka:9092 test producer + foo: + image: juplo/simple-producer:1.0-auditheaders-SNAPSHOT + command: kafka:9092 test foo + + bar: + image: juplo/simple-producer:1.0-auditheaders-SNAPSHOT + command: kafka:9092 test bar + + consumer: + image: juplo/simple-consumer:1.0-SNAPSHOT + command: kafka:9092 test my-group consumer + + peter: + image: juplo/simple-consumer:1.0-SNAPSHOT + command: kafka:9092 test my-group peter + + ute: + image: juplo/simple-consumer:1.0-SNAPSHOT + command: kafka:9092 test my-group ute + volumes: zookeeper-data: zookeeper-log: diff --git a/pom.xml b/pom.xml index 3da4d59e..fdcdff64 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ simple-producer Super Simple Producer A Simple Producer, programmed with pure Java, that sends messages via Kafka - 1.0-SNAPSHOT + 1.0-auditheaders-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 0bab4426..ea24b338 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -6,12 +6,16 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; +import java.math.BigInteger; import java.util.Properties; @Slf4j public class ExampleProducer { + public final static String HEADER_ID = "id"; + public final static String HEADER_PRODUCED = "#"; + private final String id; private final String topic; private final Producer producer; @@ -71,6 +75,11 @@ public class ExampleProducer value // Value ); + record + .headers() + .add(HEADER_ID, id.getBytes()) + .add(HEADER_PRODUCED, BigInteger.valueOf(produced).toByteArray()); + producer.send(record, (metadata, e) -> { long now = System.currentTimeMillis();