Der REST-Producer ergänzt Header (Header `X-id` und seine `client.id`)
authorKai Moritz <kai@juplo.de>
Sun, 3 Apr 2022 17:43:21 +0000 (19:43 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 3 Apr 2022 18:22:11 +0000 (20:22 +0200)
README.sh
docker-compose.yml
src/main/java/de/juplo/kafka/RestProducer.java

index ece13d0..c8aa4dc 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -24,83 +24,10 @@ fi
 
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
-docker-compose up -d kafka-ui
-
-docker-compose exec -T cli bash << 'EOF'
-echo "Creating topic with 3 partitions..."
-kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
-# tag::createtopic[]
-kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3
-# end::createtopic[]
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
-
-docker-compose up -d producer-0 producer-1 consumer
-
-while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done
-while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done
-while ! [[ $(http -b :8081/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8081/actuator/health; sleep 1; done
-
-echo foo | http -v :8000/foo
-echo foo | http -v :8001/foo
-
-sleep 5
-
-http -v :8081/seen
-
+docker-compose up setup
 docker-compose up -d
+while ! [[ $(http -b :8080/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8080/actuator/health; sleep 1; done
 
-sleep 5
-
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-
-docker-compose exec -T cli bash << 'EOF'
-echo "Altering number of partitions from 3 to 7..."
-kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
-
-docker-compose restart producer-0 producer-1
-
-while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done
-while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done
-
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-
-docker-compose stop
+echo -n bar | http -v :8080/foo
+echo -n foo | http -v :8080/bar X-id:666
+docker-compose exec cli kafkacat -b kafka:9092 -t test -f "%p|%o|%k=%s|%h\n" -e
index 6993f6c..b3a8b13 100644 (file)
@@ -24,79 +24,23 @@ services:
     depends_on:
       - zookeeper
 
-  kafka-ui:
-    image: provectuslabs/kafka-ui:0.3.3
-    ports:
-      - 8080:8080
-    environment:
-      KAFKA_CLUSTERS_0_NAME: local
-      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
+  setup:
+    image: juplo/toolbox
+    command: >
+      bash -c "
+        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+        kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
+      "
 
   cli:
     image: juplo/toolbox
     command: sleep infinity
 
-  producer-0:
+  producer:
     image: juplo/rest-producer:1.0-SNAPSHOT
     ports:
-      - 8000:8080
-    environment:
-      producer.bootstrap-server: kafka:9092
-      producer.client-id: producer
-      producer.topic: test
-      producer.partition: 0
-
-  producer-1:
-    image: juplo/rest-producer:1.0-SNAPSHOT
-    ports:
-      - 8001:8080
+      - 8080:8080
     environment:
       producer.bootstrap-server: kafka:9092
       producer.client-id: producer
       producer.topic: test
-      producer.partition: 1
-
-  peter:
-    image: juplo/rest-client:1.0-SNAPSHOT
-    environment:
-      rest-client.baseUrl: http://producer-1:8080
-      rest-client.username: peter
-      rest-client.throttle-ms: 1000
-
-  klaus:
-    image: juplo/rest-client:1.0-SNAPSHOT
-    environment:
-      rest-client.baseUrl: http://producer-1:8080
-      rest-client.username: klaus
-      rest-client.throttle-ms: 1100
-
-  beate:
-    image: juplo/rest-client:1.0-SNAPSHOT
-    environment:
-      rest-client.baseUrl: http://producer-0:8080
-      rest-client.username: beate
-      rest-client.throttle-ms: 900
-
-  franz:
-    image: juplo/rest-client:1.0-SNAPSHOT
-    environment:
-      rest-client.baseUrl: http://producer-1:8080
-      rest-client.username: franz
-      rest-client.throttle-ms: 800
-
-  uschi:
-    image: juplo/rest-client:1.0-SNAPSHOT
-    environment:
-      rest-client.baseUrl: http://producer-0:8080
-      rest-client.username: uschi
-      rest-client.throttle-ms: 1200
-
-  consumer:
-    image: juplo/counting-consumer:1.0-SNAPSHOT
-    ports:
-      - 8081:8081
-    environment:
-      consumer.bootstrap-server: kafka:9092
-      consumer.client-id: my-group
-      consumer.client-id: consumer
-      consumer.topic: test
index ac9a541..408cd2f 100644 (file)
@@ -8,6 +8,7 @@ import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 
 import javax.annotation.PreDestroy;
+import java.math.BigInteger;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
@@ -47,6 +48,7 @@ public class RestProducer
   @PostMapping(path = "{key}")
   public DeferredResult<ProduceResult> send(
       @PathVariable String key,
+      @RequestHeader(name = "X-id", required = false) Long correlationId,
       @RequestBody String value)
   {
     DeferredResult<ProduceResult> result = new DeferredResult<>();
@@ -60,6 +62,12 @@ public class RestProducer
         value   // Value
     );
 
+    record.headers().add("source", id.getBytes());
+    if (correlationId != null)
+    {
+      record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray());
+    }
+
     producer.send(record, (metadata, e) ->
     {
       long now = System.currentTimeMillis();