From b61195f531e312897ff2677a63601cb9b17b1bcb Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 3 Apr 2022 19:43:21 +0200 Subject: [PATCH] =?utf8?q?Der=20REST-Producer=20erg=C3=A4nzt=20Header=20(H?= =?utf8?q?eader=20`X-id`=20und=20seine=20`client.id`)?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 83 ++----------------- docker-compose.yml | 74 ++--------------- .../java/de/juplo/kafka/RestProducer.java | 8 ++ 3 files changed, 22 insertions(+), 143 deletions(-) diff --git a/README.sh b/README.sh index ece13d0..c8aa4dc 100755 --- a/README.sh +++ b/README.sh @@ -24,83 +24,10 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 -docker-compose up -d kafka-ui - -docker-compose exec -T cli bash << 'EOF' -echo "Creating topic with 3 partitions..." -kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test -# tag::createtopic[] -kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3 -# end::createtopic[] -kafka-topics --bootstrap-server kafka:9092 --describe --topic test -EOF - -docker-compose up -d producer-0 producer-1 consumer - -while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done -while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done -while ! [[ $(http -b :8081/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8081/actuator/health; sleep 1; done - -echo foo | http -v :8000/foo -echo foo | http -v :8001/foo - -sleep 5 - -http -v :8081/seen - +docker-compose up setup docker-compose up -d +while ! [[ $(http -b :8080/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8080/actuator/health; sleep 1; done -sleep 5 - -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen - -docker-compose exec -T cli bash << 'EOF' -echo "Altering number of partitions from 3 to 7..." -kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7 -kafka-topics --bootstrap-server kafka:9092 --describe --topic test -EOF - -docker-compose restart producer-0 producer-1 - -while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done -while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done - -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen - -docker-compose stop +echo -n bar | http -v :8080/foo +echo -n foo | http -v :8080/bar X-id:666 +docker-compose exec cli kafkacat -b kafka:9092 -t test -f "%p|%o|%k=%s|%h\n" -e diff --git a/docker-compose.yml b/docker-compose.yml index 6993f6c..b3a8b13 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,79 +24,23 @@ services: depends_on: - zookeeper - kafka-ui: - image: provectuslabs/kafka-ui:0.3.3 - ports: - - 8080:8080 - environment: - KAFKA_CLUSTERS_0_NAME: local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 + setup: + image: juplo/toolbox + command: > + bash -c " + kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test + kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 + " cli: image: juplo/toolbox command: sleep infinity - producer-0: + producer: image: juplo/rest-producer:1.0-SNAPSHOT ports: - - 8000:8080 - environment: - producer.bootstrap-server: kafka:9092 - producer.client-id: producer - producer.topic: test - producer.partition: 0 - - producer-1: - image: juplo/rest-producer:1.0-SNAPSHOT - ports: - - 8001:8080 + - 8080:8080 environment: producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test - producer.partition: 1 - - peter: - image: juplo/rest-client:1.0-SNAPSHOT - environment: - rest-client.baseUrl: http://producer-1:8080 - rest-client.username: peter - rest-client.throttle-ms: 1000 - - klaus: - image: juplo/rest-client:1.0-SNAPSHOT - environment: - rest-client.baseUrl: http://producer-1:8080 - rest-client.username: klaus - rest-client.throttle-ms: 1100 - - beate: - image: juplo/rest-client:1.0-SNAPSHOT - environment: - rest-client.baseUrl: http://producer-0:8080 - rest-client.username: beate - rest-client.throttle-ms: 900 - - franz: - image: juplo/rest-client:1.0-SNAPSHOT - environment: - rest-client.baseUrl: http://producer-1:8080 - rest-client.username: franz - rest-client.throttle-ms: 800 - - uschi: - image: juplo/rest-client:1.0-SNAPSHOT - environment: - rest-client.baseUrl: http://producer-0:8080 - rest-client.username: uschi - rest-client.throttle-ms: 1200 - - consumer: - image: juplo/counting-consumer:1.0-SNAPSHOT - ports: - - 8081:8081 - environment: - consumer.bootstrap-server: kafka:9092 - consumer.client-id: my-group - consumer.client-id: consumer - consumer.topic: test diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java index ac9a541..408cd2f 100644 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ b/src/main/java/de/juplo/kafka/RestProducer.java @@ -8,6 +8,7 @@ import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; import javax.annotation.PreDestroy; +import java.math.BigInteger; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -47,6 +48,7 @@ public class RestProducer @PostMapping(path = "{key}") public DeferredResult send( @PathVariable String key, + @RequestHeader(name = "X-id", required = false) Long correlationId, @RequestBody String value) { DeferredResult result = new DeferredResult<>(); @@ -60,6 +62,12 @@ public class RestProducer value // Value ); + record.headers().add("source", id.getBytes()); + if (correlationId != null) + { + record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray()); + } + producer.send(record, (metadata, e) -> { long now = System.currentTimeMillis(); -- 2.20.1