From c0b2d09d6eb4ef3c393723347ee1d775cb1bc9e9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 1 Nov 2024 09:55:43 +0100 Subject: [PATCH] =?utf8?q?Audit-Headers=20f=C3=BCr=20den=20`ExampleProduce?= =?utf8?q?r`=20implementiert?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 20 +++++++---------- docker/docker-compose.yml | 22 ++++++++++++++----- pom.xml | 2 +- .../java/de/juplo/kafka/ExampleProducer.java | 9 ++++++++ 4 files changed, 35 insertions(+), 18 deletions(-) diff --git a/README.sh b/README.sh index 19419b57..2ad543ee 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-producer:1.0-backpressure-SNAPSHOT +IMAGE=juplo/spring-producer:1.0-auditheaders-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -10,7 +10,7 @@ then fi docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf setup producer +docker compose -f docker/docker-compose.yml rm -svf setup peter ute if [[ $(docker image ls -q $IMAGE) == "" || @@ -26,18 +26,14 @@ fi docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 -docker compose -f docker/docker-compose.yml up -d producer -sleep 5 +docker compose -f docker/docker-compose.yml up -d peter ute +sleep 10 -docker compose -f docker/docker-compose.yml pause kafka-1 kafka-3 -sleep 20 -docker compose -f docker/docker-compose.yml unpause kafka-1 kafka-3 -sleep 3 - -docker compose -f docker/docker-compose.yml stop producer echo echo "Empfangen:" -docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -q -J +docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -c 100 -docker compose -f docker/docker-compose.yml logs producer | grep ERROR +echo +echo "Empfangen (mit Headern!):" +docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -c 100 -J diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index c3a90d67..c4dd2876 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -135,15 +135,27 @@ services: - kafka-2 - kafka-3 - producer: - image: juplo/spring-producer:1.0-backpressure-SNAPSHOT + peter: + image: juplo/spring-producer:1.0-auditheaders-SNAPSHOT + environment: + juplo.bootstrap-server: kafka:9092 + juplo.client-id: peter + juplo.producer.topic: test + juplo.producer.linger: 300ms + juplo.producer.throttle: 66ms + juplo.producer.delivery-timeout: 2147483647ms + juplo.producer.max-queue-length: 100 + + ute: + image: juplo/spring-producer:1.0-auditheaders-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 - juplo.client-id: producer + juplo.client-id: ute juplo.producer.topic: test + juplo.producer.lingers: 500ms + juplo.producer.throttle: 30ms juplo.producer.delivery-timeout: 2147483647ms - juplo.producer.max-block: 2147483647ms - juplo.producer.max-queue-length: 10 + juplo.producer.max-queue-length: 50 consumer: image: juplo/simple-consumer:1.0-SNAPSHOT diff --git a/pom.xml b/pom.xml index 0360ad39..d2f3ad77 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-producer Spring Producer A Simple Producer, based on Spring Boot, that sends messages via Kafka - 1.0-backpressure-SNAPSHOT + 1.0-auditheaders-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 7f639aba..12951cb2 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import java.math.BigInteger; import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; @@ -11,6 +12,9 @@ import java.util.concurrent.atomic.AtomicInteger; @Slf4j public class ExampleProducer implements Runnable { + public final static String HEADER_ID = "id"; + public final static String HEADER_QUEUED = "#"; + private final String id; private final String topic; private final Duration throttle; @@ -107,6 +111,11 @@ public class ExampleProducer implements Runnable int queuedAfterSend = queued.incrementAndGet(); + record + .headers() + .add(HEADER_ID, id.getBytes()) + .add(HEADER_QUEUED, BigInteger.valueOf(queuedAfterSend).toByteArray()); + producer.send(record, (metadata, e) -> { long now = System.currentTimeMillis(); -- 2.20.1