Überarbeitungen von Setup/Übung aus Branch 'rest-producer' gemerged customized---lvm-2-tage
authorKai Moritz <kai@juplo.de>
Wed, 3 Aug 2022 19:48:31 +0000 (21:48 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 3 Aug 2022 19:48:31 +0000 (21:48 +0200)
* Überarbeitungen an die Übung 'customized' angepasst.

README.sh
docker-compose.yml
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/RestProducer.java

index d2dccf8..d0a59bd 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -25,38 +25,42 @@ fi
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
 docker-compose up setup
-docker-compose up -d
+docker-compose up -d producer-0 producer-1
+while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for producer-0; sleep 1; done
+while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for producer-1; sleep 1; done
+docker-compose up -d consumer
+
+echo foo | http -v :8000/foo
+echo foo | http -v :8001/foo
+echo foo | http -v :8001/foo
+echo foo | http -v :8000/bar
+echo foobar | http -v :8000/bar
+echo foofoo | http -v :8000/bar
+echo barbar | http -v :8000/bar
+echo barfoo | http -v :8000/bar
+echo bar | http -v :8000/bar
 
-while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
-
-echo foo | http -v :8080/bar
-echo bar | http -v :8080/foo
-echo foobar | http -v :8080/bar
-dd if=/dev/zero bs=1024 count=1024  | http -v :8080/bar
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-
-docker-compose logs producer
 docker-compose logs consumer
+
+docker-compose up -d
+docker-compose exec cli kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n' -q -c20
+
+docker-compose exec -T cli bash << 'EOF'
+echo "Altering number of partitions from 3 to 7..."
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+EOF
+
+docker-compose restart producer-0 producer-1
+while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done
+while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done
+docker-compose exec cli kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n' -q -c20
+
+echo "Messages from peter"
+docker-compose logs consumer | grep k=peter
+echo "Messages from beate"
+docker-compose logs consumer | grep k=beate
+echo "Messages from foo"
+docker-compose logs consumer | grep k=foo
index 7ae8d9b..11c5c8d 100644 (file)
@@ -73,15 +73,67 @@ services:
     image: juplo/toolbox
     command: sleep infinity
 
-  producer:
+  producer-0:
     image: juplo/rest-producer:1.0-SNAPSHOT
     ports:
-      - 8080:8080
+      - 8000:8080
     environment:
       server.port: 8080
       producer.bootstrap-server: kafka:9092
       producer.client-id: producer
       producer.topic: test
+      producer.partition: 0
+
+  producer-1:
+    image: juplo/rest-producer:1.0-SNAPSHOT
+    ports:
+      - 8001:8080
+    environment:
+      server.port: 8080
+      producer.bootstrap-server: kafka:9092
+      producer.client-id: producer
+      producer.topic: test
+      producer.partition: 1
+
+  peter:
+    image: juplo/rest-client:1.0-SNAPSHOT
+    environment:
+      server.port: 8080
+      rest-client.baseUrl: http://producer-1:8080
+      rest-client.username: peter
+      rest-client.throttle-ms: 1000
+
+  klaus:
+    image: juplo/rest-client:1.0-SNAPSHOT
+    environment:
+      server.port: 8080
+      rest-client.baseUrl: http://producer-1:8080
+      rest-client.username: klaus
+      rest-client.throttle-ms: 1100
+
+  beate:
+    image: juplo/rest-client:1.0-SNAPSHOT
+    environment:
+      server.port: 8080
+      rest-client.baseUrl: http://producer-0:8080
+      rest-client.username: beate
+      rest-client.throttle-ms: 900
+
+  franz:
+    image: juplo/rest-client:1.0-SNAPSHOT
+    environment:
+      server.port: 8080
+      rest-client.baseUrl: http://producer-1:8080
+      rest-client.username: franz
+      rest-client.throttle-ms: 800
+
+  uschi:
+    image: juplo/rest-client:1.0-SNAPSHOT
+    environment:
+      server.port: 8080
+      rest-client.baseUrl: http://producer-0:8080
+      rest-client.username: uschi
+      rest-client.throttle-ms: 1200
 
   consumer:
     image: juplo/toolbox
index c74f588..78b6085 100644 (file)
@@ -4,6 +4,7 @@ import lombok.Getter;
 import lombok.Setter;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
+
 @ConfigurationProperties(prefix = "producer")
 @Getter
 @Setter
index 59d2c77..b5bb2d9 100644 (file)
@@ -57,6 +57,7 @@ public class RestProducer
 
     final ProducerRecord<String, String> record = new ProducerRecord<>(
         topic,  // Topic
+        partition, // Partition
         key,    // Key
         value   // Value
     );