From 59e6285162b2b441f68c25ba4c969802faf44a2d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 14 Dec 2021 20:41:16 +0100 Subject: [PATCH] Round Robin: the producer uses round-robin for partitioning --- README.sh | 61 +++++++- docker-compose.yml | 132 ++++++++++++++++-- pom.xml | 4 +- .../java/de/juplo/kafka/EndlessProducer.java | 1 + 4 files changed, 182 insertions(+), 16 deletions(-) diff --git a/README.sh b/README.sh index 0544297..9190bf0 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/endless-producer:1.0-SNAPSHOT +IMAGE=juplo/round-robin:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -24,8 +24,61 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec kafka cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 -docker-compose up setup +docker-compose up -d kafka-ui + +docker-compose exec -T cli bash << 'EOF' +echo "Creating topic with 3 partitions..." +kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test +# tag::createtopic[] +kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3 +# end::createtopic[] +kafka-topics --bootstrap-server kafka:9092 --describe --topic test +EOF + +docker-compose up -d producer +sleep 10 +docker-compose stop producer +docker-compose up count-0 +docker-compose up count-1 +docker-compose up count-2 + +docker-compose exec -T cli bash << 'EOF' +echo "Altering number of partitions from 3 to 7..." +# tag::altertopic[] +kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7 +kafka-topics --bootstrap-server kafka:9092 --describe --topic test +# end::altertopic[] +EOF + docker-compose up -d producer -sleep 5 +sleep 10 +docker-compose stop producer +docker-compose up count-0 +docker-compose up count-1 +docker-compose up count-2 +docker-compose up count-3 +docker-compose up count-4 +docker-compose up count-5 +docker-compose up count-6 + +docker-compose exec -T cli bash << 'EOF' +echo "Recreating topic with 7 partitions..." +# tag::recreatetopic[] +kafka-topics --bootstrap-server kafka:9092 --delete --topic test +kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 7 +kafka-topics --bootstrap-server kafka:9092 --describe --topic test +# end::recreatetopic[] +EOF + +docker-compose up -d producer +sleep 10 +docker-compose stop producer +docker-compose up count-0 +docker-compose up count-1 +docker-compose up count-2 +docker-compose up count-3 +docker-compose up count-4 +docker-compose up count-5 +docker-compose up count-6 + docker-compose stop producer -docker-compose logs producer diff --git a/docker-compose.yml b/docker-compose.yml index 10ad3a0..c2c9bd6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,24 +24,136 @@ services: depends_on: - zookeeper - setup: - image: juplo/toolbox - command: > - bash -c " - kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test - kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 - " + kafka-ui: + image: provectuslabs/kafka-ui:0.2.1 + ports: + - 8080:8080 + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 cli: image: juplo/toolbox command: sleep infinity producer: - image: juplo/endless-producer:1.0-SNAPSHOT + image: juplo/round-robin:1.0-SNAPSHOT ports: - - 8080:8080 + - 8000:8080 environment: producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test - producer.throttle-ms: 200 + producer.throttle-ms: 10 + + count-0: + image: juplo/toolbox + command: > + bash -c " + kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic test \ + --partition 0 \ + --property print.key=true \ + --property print.value=false \ + --from-beginning \ + --timeout-ms 1000 2>/dev/null | sort | uniq -c + " + + count-1: + image: juplo/toolbox + command: > + bash -c " + kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic test \ + --partition 1 \ + --property print.key=true \ + --property print.value=false \ + --from-beginning \ + --timeout-ms 1000 2>/dev/null | sort | uniq -c + " + + count-2: + image: juplo/toolbox + command: > + bash -c " + kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic test \ + --partition 2 \ + --property print.key=true \ + --property print.value=false \ + --from-beginning \ + --timeout-ms 1000 2>/dev/null | sort | uniq -c + " + + count-3: + image: juplo/toolbox + command: > + bash -c " + kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic test \ + --partition 3 \ + --property print.key=true \ + --property print.value=false \ + --from-beginning \ + --timeout-ms 1000 2>/dev/null | sort | uniq -c + " + + count-4: + image: juplo/toolbox + command: > + bash -c " + kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic test \ + --partition 4 \ + --property print.key=true \ + --property print.value=false \ + --from-beginning \ + --timeout-ms 1000 2>/dev/null | sort | uniq -c + " + + count-5: + image: juplo/toolbox + command: > + bash -c " + kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic test \ + --partition 5 \ + --property print.key=true \ + --property print.value=false \ + --from-beginning \ + --timeout-ms 1000 2>/dev/null | sort | uniq -c + " + + count-6: + image: juplo/toolbox + command: > + bash -c " + kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic test \ + --partition 6 \ + --property print.key=true \ + --property print.value=false \ + --from-beginning \ + --timeout-ms 1000 2>/dev/null | sort | uniq -c + " + + count-7: + image: juplo/toolbox + command: > + bash -c " + kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic test \ + --partition 7 \ + --property print.key=true \ + --property print.value=false \ + --from-beginning \ + --timeout-ms 1000 2>/dev/null | sort | uniq -c + " diff --git a/pom.xml b/pom.xml index 7028cfd..9741146 100644 --- a/pom.xml +++ b/pom.xml @@ -12,8 +12,8 @@ de.juplo.kafka - endless-producer - Endless Producer: a Simple Producer that endlessly writes numbers into a topic + round-robin + Round Robin: a version of the Endless Producer, that uses round robin for partitioning 1.0-SNAPSHOT diff --git a/src/main/java/de/juplo/kafka/EndlessProducer.java b/src/main/java/de/juplo/kafka/EndlessProducer.java index 43b0e41..6b42954 100644 --- a/src/main/java/de/juplo/kafka/EndlessProducer.java +++ b/src/main/java/de/juplo/kafka/EndlessProducer.java @@ -45,6 +45,7 @@ public class EndlessProducer implements Runnable props.put("bootstrap.servers", bootstrapServer); props.put("client.id", clientId); props.put("acks", acks); + props.put("partitioner.class", "org.apache.kafka.clients.producer.RoundRobinPartitioner"); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); -- 2.20.1