From d7743828914d3e5b54720a4f584c1337646c1b7a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 14 Dec 2021 20:52:22 +0100 Subject: [PATCH] Manual Partitioning: partitioning modul 2 --- README.sh | 39 +++++- docker-compose.yml | 132 ++++++++++++++++-- pom.xml | 4 +- .../java/de/juplo/kafka/EndlessProducer.java | 1 + 4 files changed, 160 insertions(+), 16 deletions(-) diff --git a/README.sh b/README.sh index 0544297..9e47432 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/endless-producer:1.0-SNAPSHOT +IMAGE=juplo/manual-partitioning:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -24,8 +24,39 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec kafka cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 -docker-compose up setup +docker-compose up -d kafka-ui + +docker-compose exec -T cli bash << 'EOF' +echo "Creating topic with 3 partitions..." +kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test +# tag::createtopic[] +kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3 +# end::createtopic[] +kafka-topics --bootstrap-server kafka:9092 --describe --topic test +EOF + +docker-compose up -d producer +sleep 10 +docker-compose stop producer +docker-compose up count-0 +docker-compose up count-1 +docker-compose up count-2 + +docker-compose exec -T cli bash << 'EOF' +echo "Altering number of partitions from 3 to 5..." +# tag::altertopic[] +kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 5 +kafka-topics --bootstrap-server kafka:9092 --describe --topic test +# end::altertopic[] +EOF + docker-compose up -d producer -sleep 5 +sleep 10 +docker-compose stop producer +docker-compose up count-0 +docker-compose up count-1 +docker-compose up count-2 +docker-compose up count-3 +docker-compose up count-4 + docker-compose stop producer -docker-compose logs producer diff --git a/docker-compose.yml b/docker-compose.yml index 10ad3a0..5ce6d76 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,24 +24,136 @@ services: depends_on: - zookeeper - setup: - image: juplo/toolbox - command: > - bash -c " - kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test - kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 - " + kafka-ui: + image: provectuslabs/kafka-ui:0.2.1 + ports: + - 8080:8080 + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 cli: image: juplo/toolbox command: sleep infinity producer: - image: juplo/endless-producer:1.0-SNAPSHOT + image: juplo/manual-partitioning:1.0-SNAPSHOT ports: - - 8080:8080 + - 8000:8080 environment: producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test - producer.throttle-ms: 200 + producer.throttle-ms: 10 + + count-0: + image: juplo/toolbox + command: > + bash -c " + kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic test \ + --partition 0 \ + --property print.key=true \ + --property print.value=false \ + --from-beginning \ + --timeout-ms 1000 2>/dev/null | sort | uniq -c + " + + count-1: + image: juplo/toolbox + command: > + bash -c " + kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic test \ + --partition 1 \ + --property print.key=true \ + --property print.value=false \ + --from-beginning \ + --timeout-ms 1000 2>/dev/null | sort | uniq -c + " + + count-2: + image: juplo/toolbox + command: > + bash -c " + kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic test \ + --partition 2 \ + --property print.key=true \ + --property print.value=false \ + --from-beginning \ + --timeout-ms 1000 2>/dev/null | sort | uniq -c + " + + count-3: + image: juplo/toolbox + command: > + bash -c " + kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic test \ + --partition 3 \ + --property print.key=true \ + --property print.value=false \ + --from-beginning \ + --timeout-ms 1000 2>/dev/null | sort | uniq -c + " + + count-4: + image: juplo/toolbox + command: > + bash -c " + kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic test \ + --partition 4 \ + --property print.key=true \ + --property print.value=false \ + --from-beginning \ + --timeout-ms 1000 2>/dev/null | sort | uniq -c + " + + count-5: + image: juplo/toolbox + command: > + bash -c " + kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic test \ + --partition 5 \ + --property print.key=true \ + --property print.value=false \ + --from-beginning \ + --timeout-ms 1000 2>/dev/null | sort | uniq -c + " + + count-6: + image: juplo/toolbox + command: > + bash -c " + kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic test \ + --partition 6 \ + --property print.key=true \ + --property print.value=false \ + --from-beginning \ + --timeout-ms 1000 2>/dev/null | sort | uniq -c + " + + count-7: + image: juplo/toolbox + command: > + bash -c " + kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic test \ + --partition 7 \ + --property print.key=true \ + --property print.value=false \ + --from-beginning \ + --timeout-ms 1000 2>/dev/null | sort | uniq -c + " diff --git a/pom.xml b/pom.xml index 7028cfd..a6228d5 100644 --- a/pom.xml +++ b/pom.xml @@ -12,8 +12,8 @@ de.juplo.kafka - endless-producer - Endless Producer: a Simple Producer that endlessly writes numbers into a topic + manual-partitioning + Manual Partitioning: a version of the Endless Producer with fixed partitioning 1.0-SNAPSHOT diff --git a/src/main/java/de/juplo/kafka/EndlessProducer.java b/src/main/java/de/juplo/kafka/EndlessProducer.java index 43b0e41..1881487 100644 --- a/src/main/java/de/juplo/kafka/EndlessProducer.java +++ b/src/main/java/de/juplo/kafka/EndlessProducer.java @@ -62,6 +62,7 @@ public class EndlessProducer implements Runnable final ProducerRecord record = new ProducerRecord<>( topic, // Topic + (int)i%2, // Partition Long.toString(i % 10), // Key Long.toString(i) // Value ); -- 2.20.1