Merge der überarbeiteten Compose-Konfiguration (Branch 'customized') headers---lvm-2-tage
authorKai Moritz <kai@juplo.de>
Sun, 7 Aug 2022 11:40:24 +0000 (13:40 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 7 Aug 2022 13:57:59 +0000 (15:57 +0200)
README.sh
docker-compose.yml
src/main/java/de/juplo/kafka/RestProducer.java

index d0a59bd..94a853a 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -25,42 +25,14 @@ fi
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
 docker-compose up setup
-docker-compose up -d producer-0 producer-1
-while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for producer-0; sleep 1; done
-while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for producer-1; sleep 1; done
-docker-compose up -d consumer
+docker-compose up -d producer
 
-echo foo | http -v :8000/foo
-echo foo | http -v :8001/foo
-echo foo | http -v :8001/foo
-echo foo | http -v :8000/bar
-echo foobar | http -v :8000/bar
-echo foofoo | http -v :8000/bar
-echo barbar | http -v :8000/bar
-echo barfoo | http -v :8000/bar
-echo bar | http -v :8000/bar
+while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
 
-docker-compose logs consumer
-
-docker-compose up -d
-docker-compose exec cli kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n' -q -c20
-
-docker-compose exec -T cli bash << 'EOF'
-echo "Altering number of partitions from 3 to 7..."
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
-
-docker-compose restart producer-0 producer-1
-while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done
-while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done
-docker-compose exec cli kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n' -q -c20
-
-echo "Messages from peter"
-docker-compose logs consumer | grep k=peter
-echo "Messages from beate"
-docker-compose logs consumer | grep k=beate
-echo "Messages from foo"
-docker-compose logs consumer | grep k=foo
+# tag::http[]
+echo -n bar | http -v :8080/foo
+echo -n foo | http -v :8080/bar X-id:666
+# end::http[]
+# tag::kafkacat[]
+docker-compose exec cli kafkacat -b kafka:9092 -t test -f "%p|%o|%k=%s|%h\n" -e
+# end::kafkacat[]
index 11c5c8d..9265c13 100644 (file)
@@ -73,33 +73,21 @@ services:
     image: juplo/toolbox
     command: sleep infinity
 
-  producer-0:
+  producer:
     image: juplo/rest-producer:1.0-SNAPSHOT
     ports:
-      - 8000:8080
+      - 8080:8080
     environment:
       server.port: 8080
       producer.bootstrap-server: kafka:9092
       producer.client-id: producer
       producer.topic: test
-      producer.partition: 0
-
-  producer-1:
-    image: juplo/rest-producer:1.0-SNAPSHOT
-    ports:
-      - 8001:8080
-    environment:
-      server.port: 8080
-      producer.bootstrap-server: kafka:9092
-      producer.client-id: producer
-      producer.topic: test
-      producer.partition: 1
 
   peter:
     image: juplo/rest-client:1.0-SNAPSHOT
     environment:
       server.port: 8080
-      rest-client.baseUrl: http://producer-1:8080
+      rest-client.baseUrl: http://producer:8080
       rest-client.username: peter
       rest-client.throttle-ms: 1000
 
@@ -107,7 +95,7 @@ services:
     image: juplo/rest-client:1.0-SNAPSHOT
     environment:
       server.port: 8080
-      rest-client.baseUrl: http://producer-1:8080
+      rest-client.baseUrl: http://producer:8080
       rest-client.username: klaus
       rest-client.throttle-ms: 1100
 
@@ -115,7 +103,7 @@ services:
     image: juplo/rest-client:1.0-SNAPSHOT
     environment:
       server.port: 8080
-      rest-client.baseUrl: http://producer-0:8080
+      rest-client.baseUrl: http://producer:8080
       rest-client.username: beate
       rest-client.throttle-ms: 900
 
@@ -123,7 +111,7 @@ services:
     image: juplo/rest-client:1.0-SNAPSHOT
     environment:
       server.port: 8080
-      rest-client.baseUrl: http://producer-1:8080
+      rest-client.baseUrl: http://producer:8080
       rest-client.username: franz
       rest-client.throttle-ms: 800
 
@@ -131,7 +119,7 @@ services:
     image: juplo/rest-client:1.0-SNAPSHOT
     environment:
       server.port: 8080
-      rest-client.baseUrl: http://producer-0:8080
+      rest-client.baseUrl: http://producer:8080
       rest-client.username: uschi
       rest-client.throttle-ms: 1200
 
index b5bb2d9..b430e35 100644 (file)
@@ -9,6 +9,7 @@ import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 
 import javax.annotation.PreDestroy;
+import java.math.BigInteger;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
@@ -62,6 +63,12 @@ public class RestProducer
         value   // Value
     );
 
+    record.headers().add("source", id.getBytes());
+    if (correlationId != null)
+    {
+      record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray());
+    }
+
     producer.send(record, (metadata, e) ->
     {
       long now = System.currentTimeMillis();