Überarbeitungen von Setup/Übung aus Branch 'rest-producer' gemerged customized---lvm-2-tage
authorKai Moritz <kai@juplo.de>
Wed, 3 Aug 2022 19:48:31 +0000 (21:48 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 3 Aug 2022 19:48:31 +0000 (21:48 +0200)
* Überarbeitungen an die Übung 'customized' angepasst.

1  2 
README.sh
docker-compose.yml
src/main/java/de/juplo/kafka/RestProducer.java

diff --combined README.sh
+++ b/README.sh
@@@ -9,7 -9,7 +9,7 @@@ the
    exit
  fi
  
- docker-compose up -d zookeeper kafka cli
+ docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli
  
  if [[
    $(docker image ls -q $IMAGE) == "" ||
@@@ -24,83 -24,39 +24,43 @@@ f
  
  echo "Waiting for the Kafka-Cluster to become ready..."
  docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
- docker-compose up -d kafka-ui
- docker-compose exec -T cli bash << 'EOF'
- echo "Creating topic with 3 partitions..."
- kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
- # tag::createtopic[]
- kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3
- # end::createtopic[]
- kafka-topics --bootstrap-server kafka:9092 --describe --topic test
- EOF
- docker-compose up -d producer-0 producer-1 consumer
- while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done
- while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done
- while ! [[ $(http -b :8081/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8081/actuator/health; sleep 1; done
+ docker-compose up setup
 -docker-compose up -d
++docker-compose up -d producer-0 producer-1
++while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for producer-0; sleep 1; done
++while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for producer-1; sleep 1; done
++docker-compose up -d consumer
 +
 +echo foo | http -v :8000/foo
 +echo foo | http -v :8001/foo
++echo foo | http -v :8001/foo
++echo foo | http -v :8000/bar
++echo foobar | http -v :8000/bar
++echo foofoo | http -v :8000/bar
++echo barbar | http -v :8000/bar
++echo barfoo | http -v :8000/bar
++echo bar | http -v :8000/bar
  
- sleep 5
 -while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
--
- http -v :8081/seen
 -echo foo | http -v :8080/bar
 -echo bar | http -v :8080/foo
 -echo foobar | http -v :8080/bar
 -dd if=/dev/zero bs=1024 count=1024  | http -v :8080/bar
 -echo foofoo | http -v :8080/bar
 -echo barbar | http -v :8080/foo
 -echo foofoo | http -v :8080/bar
 -echo barbar | http -v :8080/foo
 -echo foofoo | http -v :8080/bar
 -echo barbar | http -v :8080/foo
 -echo foofoo | http -v :8080/bar
 -echo barbar | http -v :8080/foo
 -echo foofoo | http -v :8080/bar
 -echo barbar | http -v :8080/foo
 -echo foofoo | http -v :8080/bar
 -echo barbar | http -v :8080/foo
 -echo foofoo | http -v :8080/bar
 -echo barbar | http -v :8080/foo
 -echo foofoo | http -v :8080/bar
 -echo barbar | http -v :8080/foo
 -echo foofoo | http -v :8080/bar
 -echo barbar | http -v :8080/foo
 -echo foofoo | http -v :8080/bar
 -echo barbar | http -v :8080/foo
 -echo foofoo | http -v :8080/bar
 -echo barbar | http -v :8080/foo
 -echo foofoo | http -v :8080/bar
 -echo barbar | http -v :8080/foo
 -
 -docker-compose logs producer
+ docker-compose logs consumer
 +
 +docker-compose up -d
- sleep 5
- http -v :8081/seen
- sleep 1
- http -v :8081/seen
- sleep 1
- http -v :8081/seen
- sleep 1
- http -v :8081/seen
- sleep 1
- http -v :8081/seen
- sleep 1
- http -v :8081/seen
- sleep 1
- http -v :8081/seen
- sleep 1
- http -v :8081/seen
- sleep 1
- http -v :8081/seen
- sleep 1
- http -v :8081/seen
++docker-compose exec cli kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n' -q -c20
 +
 +docker-compose exec -T cli bash << 'EOF'
 +echo "Altering number of partitions from 3 to 7..."
++kafka-topics --bootstrap-server kafka:9092 --describe --topic test
++kafka-topics --bootstrap-server kafka:9092 --describe --topic test
 +kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7
 +kafka-topics --bootstrap-server kafka:9092 --describe --topic test
 +EOF
 +
 +docker-compose restart producer-0 producer-1
 +while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done
 +while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done
- http -v :8081/seen
- sleep 1
- http -v :8081/seen
- sleep 1
- http -v :8081/seen
- sleep 1
- http -v :8081/seen
- sleep 1
- http -v :8081/seen
- sleep 1
- http -v :8081/seen
- sleep 1
- http -v :8081/seen
- sleep 1
- http -v :8081/seen
- sleep 1
- http -v :8081/seen
- sleep 1
- http -v :8081/seen
- docker-compose stop
++docker-compose exec cli kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n' -q -c20
++
++echo "Messages from peter"
++docker-compose logs consumer | grep k=peter
++echo "Messages from beate"
++docker-compose logs consumer | grep k=beate
++echo "Messages from foo"
++docker-compose logs consumer | grep k=foo
diff --combined docker-compose.yml
@@@ -7,104 -7,82 +7,134 @@@ services
      ports:
        - 2181:2181
  
-   kafka:
+   kafka-1:
      image: confluentinc/cp-kafka:7.1.3
      environment:
        KAFKA_BROKER_ID: 1
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+       KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9081
+       KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-1:9092, LOCALHOST://localhost:9081
+       KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+       KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+     ports:
+       - 9081:9081
+     depends_on:
+       - zookeeper
+   kafka-2:
+     image: confluentinc/cp-kafka:7.1.3
+     environment:
+       KAFKA_BROKER_ID: 2
+       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9082
-       KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka:9092, LOCALHOST://localhost:9082
+       KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-2:9092, LOCALHOST://localhost:9082
        KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
-       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
        KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      ports:
        - 9092:9082
        - 9082:9082
+     networks:
+       default:
+         aliases:
+           - kafka
      depends_on:
        - zookeeper
  
-   kafka-ui:
-     image: provectuslabs/kafka-ui:0.3.3
-     ports:
-       - 8080:8080
+   kafka-3:
+     image: confluentinc/cp-kafka:7.1.3
      environment:
-       KAFKA_CLUSTERS_0_NAME: local
-       KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
+       KAFKA_BROKER_ID: 3
+       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+       KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9083
+       KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-3:9092, LOCALHOST://localhost:9083
+       KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+       KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+     ports:
+       - 9083:9083
+     depends_on:
+       - zookeeper
+   setup:
+     image: juplo/toolbox
+     command: >
+       bash -c "
+         kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+         kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 --replication-factor 3 --config min.insync.replicas=2
+         kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+       "
  
    cli:
      image: juplo/toolbox
      command: sleep infinity
  
 -  producer:
 +  producer-0:
      image: juplo/rest-producer:1.0-SNAPSHOT
      ports:
 -      - 8080:8080
 +      - 8000:8080
      environment:
        server.port: 8080
        producer.bootstrap-server: kafka:9092
        producer.client-id: producer
        producer.topic: test
 +      producer.partition: 0
 +
 +  producer-1:
 +    image: juplo/rest-producer:1.0-SNAPSHOT
 +    ports:
 +      - 8001:8080
 +    environment:
 +      server.port: 8080
 +      producer.bootstrap-server: kafka:9092
 +      producer.client-id: producer
 +      producer.topic: test
 +      producer.partition: 1
 +
 +  peter:
 +    image: juplo/rest-client:1.0-SNAPSHOT
 +    environment:
 +      server.port: 8080
 +      rest-client.baseUrl: http://producer-1:8080
 +      rest-client.username: peter
 +      rest-client.throttle-ms: 1000
 +
 +  klaus:
 +    image: juplo/rest-client:1.0-SNAPSHOT
 +    environment:
 +      server.port: 8080
 +      rest-client.baseUrl: http://producer-1:8080
 +      rest-client.username: klaus
 +      rest-client.throttle-ms: 1100
 +
 +  beate:
 +    image: juplo/rest-client:1.0-SNAPSHOT
 +    environment:
 +      server.port: 8080
 +      rest-client.baseUrl: http://producer-0:8080
 +      rest-client.username: beate
 +      rest-client.throttle-ms: 900
 +
 +  franz:
 +    image: juplo/rest-client:1.0-SNAPSHOT
 +    environment:
 +      server.port: 8080
 +      rest-client.baseUrl: http://producer-1:8080
 +      rest-client.username: franz
 +      rest-client.throttle-ms: 800
 +
 +  uschi:
 +    image: juplo/rest-client:1.0-SNAPSHOT
 +    environment:
 +      server.port: 8080
 +      rest-client.baseUrl: http://producer-0:8080
 +      rest-client.username: uschi
 +      rest-client.throttle-ms: 1200
  
    consumer:
-     image: juplo/endless-consumer:1.0-SNAPSHOT
-     ports:
-       - 8081:8080
-     environment:
-       server.port: 8080
-       consumer.bootstrap-server: kafka:9092
-       consumer.client-id: my-group
-       consumer.client-id: consumer
-       consumer.topic: test
+     image: juplo/toolbox
+     command: kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n'
@@@ -48,6 -48,7 +48,7 @@@ public class RestProduce
    @PostMapping(path = "{key}")
    public DeferredResult<ProduceResult> send(
        @PathVariable String key,
+       @RequestHeader(name = "X-id", required = false) Long correlationId,
        @RequestBody String value)
    {
      DeferredResult<ProduceResult> result = new DeferredResult<>();
@@@ -56,7 -57,6 +57,7 @@@
  
      final ProducerRecord<String, String> record = new ProducerRecord<>(
          topic,  // Topic
 +        partition, // Partition
          key,    // Key
          value   // Value
      );
@@@ -97,9 -97,8 +98,8 @@@
  
      long now = System.currentTimeMillis();
      log.trace(
-         "{} - Queued #{} key={} latency={}ms",
+         "{} - Queued message with key={} latency={}ms",
          id,
-         value,
          record.key(),
          now - time
      );