From: Kai Moritz Date: Fri, 8 Apr 2022 11:25:36 +0000 (+0200) Subject: Merge branch 'endless-stream-producer' into HEAD X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=d7c9ce7f84fad4ca5860b43c2825c2aeb5bca430;hp=958835b95ba4edd3f299d8be42e510fe750e6356;p=demos%2Fkafka%2Ftraining Merge branch 'endless-stream-producer' into HEAD --- diff --git a/README.sh b/README.sh index 73ceebc..3ec2781 100755 --- a/README.sh +++ b/README.sh @@ -24,8 +24,67 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 -docker-compose up setup -docker-compose up -d producer -sleep 5 -docker-compose stop producer -docker-compose logs producer +docker-compose up -d kafka-ui + +docker-compose exec -T cli bash << 'EOF' +echo "Creating topic with 3 partitions..." +kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test +# tag::createtopic[] +kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3 +# end::createtopic[] +kafka-topics --bootstrap-server kafka:9092 --describe --topic test +EOF + +docker-compose up -d producer consumer + +sleep 10 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen + +docker-compose exec -T cli bash << 'EOF' +echo "Altering number of partitions from 3 to 7..." +# tag::altertopic[] +kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7 +kafka-topics --bootstrap-server kafka:9092 --describe --topic test +# end::altertopic[] +EOF + +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen + +http post :8081/stop +http post :8081/start + +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen + +docker-compose stop producer consumer diff --git a/docker-compose.yml b/docker-compose.yml index d85269a..15e0c4c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,13 +24,13 @@ services: depends_on: - zookeeper - setup: - image: juplo/toolbox - command: > - bash -c " - kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test - kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 - " + kafka-ui: + image: provectuslabs/kafka-ui:0.3.3 + ports: + - 8080:8080 + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 cli: image: juplo/toolbox @@ -39,9 +39,20 @@ services: producer: image: juplo/endless-producer:1.0-SNAPSHOT ports: - - 8080:8080 + - 8000:8080 environment: producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test - producer.throttle-ms: 200 + producer.throttle-ms: 10 + + + consumer: + image: juplo/counting-consumer:1.0-SNAPSHOT + ports: + - 8081:8081 + environment: + consumer.bootstrap-server: kafka:9092 + consumer.client-id: my-group + consumer.client-id: consumer + consumer.topic: test diff --git a/src/main/java/de/juplo/kafka/EndlessProducer.java b/src/main/java/de/juplo/kafka/EndlessProducer.java index 7a5b324..8b3743d 100644 --- a/src/main/java/de/juplo/kafka/EndlessProducer.java +++ b/src/main/java/de/juplo/kafka/EndlessProducer.java @@ -41,6 +41,7 @@ public class EndlessProducer implements Runnable props.put("bootstrap.servers", bootstrapServer); props.put("client.id", clientId); props.put("acks", acks); + props.put("metadata.max.age.ms", "1000"); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName());