Überarbeitungen von Setup/Übung aus Branch 'rest-producer' gemerged customized---lvm-2-tage
authorKai Moritz <kai@juplo.de>
Wed, 3 Aug 2022 19:48:31 +0000 (21:48 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 3 Aug 2022 19:48:31 +0000 (21:48 +0200)
* Überarbeitungen an die Übung 'customized' angepasst.

README.sh
docker-compose.yml
src/main/java/de/juplo/kafka/RestProducer.java
src/main/resources/application.yml

index ece13d0..d0a59bd 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -9,7 +9,7 @@ then
   exit
 fi
 
-docker-compose up -d zookeeper kafka cli
+docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
@@ -24,83 +24,43 @@ fi
 
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
-docker-compose up -d kafka-ui
-
-docker-compose exec -T cli bash << 'EOF'
-echo "Creating topic with 3 partitions..."
-kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
-# tag::createtopic[]
-kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3
-# end::createtopic[]
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
-
-docker-compose up -d producer-0 producer-1 consumer
-
-while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done
-while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done
-while ! [[ $(http -b :8081/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8081/actuator/health; sleep 1; done
+docker-compose up setup
+docker-compose up -d producer-0 producer-1
+while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for producer-0; sleep 1; done
+while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for producer-1; sleep 1; done
+docker-compose up -d consumer
 
 echo foo | http -v :8000/foo
 echo foo | http -v :8001/foo
+echo foo | http -v :8001/foo
+echo foo | http -v :8000/bar
+echo foobar | http -v :8000/bar
+echo foofoo | http -v :8000/bar
+echo barbar | http -v :8000/bar
+echo barfoo | http -v :8000/bar
+echo bar | http -v :8000/bar
 
-sleep 5
-
-http -v :8081/seen
+docker-compose logs consumer
 
 docker-compose up -d
-
-sleep 5
-
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
+docker-compose exec cli kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n' -q -c20
 
 docker-compose exec -T cli bash << 'EOF'
 echo "Altering number of partitions from 3 to 7..."
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
 kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7
 kafka-topics --bootstrap-server kafka:9092 --describe --topic test
 EOF
 
 docker-compose restart producer-0 producer-1
-
 while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done
 while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done
-
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-
-docker-compose stop
+docker-compose exec cli kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n' -q -c20
+
+echo "Messages from peter"
+docker-compose logs consumer | grep k=peter
+echo "Messages from beate"
+docker-compose logs consumer | grep k=beate
+echo "Messages from foo"
+docker-compose logs consumer | grep k=foo
index fe1ad42..11c5c8d 100644 (file)
@@ -7,30 +7,67 @@ services:
     ports:
       - 2181:2181
 
-  kafka:
+  kafka-1:
     image: confluentinc/cp-kafka:7.1.3
     environment:
       KAFKA_BROKER_ID: 1
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9081
+      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-1:9092, LOCALHOST://localhost:9081
+      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+    ports:
+      - 9081:9081
+    depends_on:
+      - zookeeper
+
+  kafka-2:
+    image: confluentinc/cp-kafka:7.1.3
+    environment:
+      KAFKA_BROKER_ID: 2
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
       KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9082
-      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka:9092, LOCALHOST://localhost:9082
+      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-2:9092, LOCALHOST://localhost:9082
       KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
-      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
       KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
     ports:
       - 9092:9082
       - 9082:9082
+    networks:
+      default:
+        aliases:
+          - kafka
     depends_on:
       - zookeeper
 
-  kafka-ui:
-    image: provectuslabs/kafka-ui:0.3.3
-    ports:
-      - 8080:8080
+  kafka-3:
+    image: confluentinc/cp-kafka:7.1.3
     environment:
-      KAFKA_CLUSTERS_0_NAME: local
-      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
+      KAFKA_BROKER_ID: 3
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9083
+      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-3:9092, LOCALHOST://localhost:9083
+      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+    ports:
+      - 9083:9083
+    depends_on:
+      - zookeeper
+
+  setup:
+    image: juplo/toolbox
+    command: >
+      bash -c "
+        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+        kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 --replication-factor 3 --config min.insync.replicas=2
+        kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+      "
 
   cli:
     image: juplo/toolbox
@@ -99,12 +136,5 @@ services:
       rest-client.throttle-ms: 1200
 
   consumer:
-    image: juplo/endless-consumer:1.0-SNAPSHOT
-    ports:
-      - 8081:8080
-    environment:
-      server.port: 8080
-      consumer.bootstrap-server: kafka:9092
-      consumer.client-id: my-group
-      consumer.client-id: consumer
-      consumer.topic: test
+    image: juplo/toolbox
+    command: kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n'
index 6e4236b..b5bb2d9 100644 (file)
@@ -48,6 +48,7 @@ public class RestProducer
   @PostMapping(path = "{key}")
   public DeferredResult<ProduceResult> send(
       @PathVariable String key,
+      @RequestHeader(name = "X-id", required = false) Long correlationId,
       @RequestBody String value)
   {
     DeferredResult<ProduceResult> result = new DeferredResult<>();
@@ -97,9 +98,8 @@ public class RestProducer
 
     long now = System.currentTimeMillis();
     log.trace(
-        "{} - Queued #{} key={} latency={}ms",
+        "{} - Queued message with key={} latency={}ms",
         id,
-        value,
         record.key(),
         now - time
     );
index 726204e..0d5752c 100644 (file)
@@ -31,6 +31,6 @@ info:
 logging:
   level:
     root: INFO
-    de.juplo: DEBUG
+    de.juplo: TRACE
 server:
   port: 8880