#!/bin/bash
-IMAGE=juplo/endless-producer:1.0-SNAPSHOT
+IMAGE=juplo/round-robin:1.0-SNAPSHOT
if [ "$1" = "cleanup" ]
then
echo "Waiting for the Kafka-Cluster to become ready..."
docker-compose exec kafka cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
-docker-compose up setup
+docker-compose up -d kafka-ui
+
+docker-compose exec -T cli bash << 'EOF'
+echo "Creating topic with 3 partitions..."
+kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+# tag::createtopic[]
+kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3
+# end::createtopic[]
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+EOF
+
+docker-compose up -d producer
+sleep 10
+docker-compose stop producer
+docker-compose up count-0
+docker-compose up count-1
+docker-compose up count-2
+
+docker-compose exec -T cli bash << 'EOF'
+echo "Altering number of partitions from 3 to 7..."
+# tag::altertopic[]
+kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+# end::altertopic[]
+EOF
+
docker-compose up -d producer
-sleep 5
+sleep 10
+docker-compose stop producer
+docker-compose up count-0
+docker-compose up count-1
+docker-compose up count-2
+docker-compose up count-3
+docker-compose up count-4
+docker-compose up count-5
+docker-compose up count-6
+
+docker-compose exec -T cli bash << 'EOF'
+echo "Recreating topic with 7 partitions..."
+# tag::recreatetopic[]
+kafka-topics --bootstrap-server kafka:9092 --delete --topic test
+kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 7
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+# end::recreatetopic[]
+EOF
+
+docker-compose up -d producer
+sleep 10
+docker-compose stop producer
+docker-compose up count-0
+docker-compose up count-1
+docker-compose up count-2
+docker-compose up count-3
+docker-compose up count-4
+docker-compose up count-5
+docker-compose up count-6
+
docker-compose stop producer
-docker-compose logs producer
depends_on:
- zookeeper
- setup:
- image: juplo/toolbox
- command: >
- bash -c "
- kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
- kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
- "
+ kafka-ui:
+ image: provectuslabs/kafka-ui:0.2.1
+ ports:
+ - 8080:8080
+ environment:
+ KAFKA_CLUSTERS_0_NAME: local
+ KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
cli:
image: juplo/toolbox
command: sleep infinity
producer:
- image: juplo/endless-producer:1.0-SNAPSHOT
+ image: juplo/round-robin:1.0-SNAPSHOT
ports:
- - 8080:8080
+ - 8000:8080
environment:
producer.bootstrap-server: kafka:9092
producer.client-id: producer
producer.topic: test
- producer.throttle-ms: 200
+ producer.throttle-ms: 10
+
+ count-0:
+ image: juplo/toolbox
+ command: >
+ bash -c "
+ kafka-console-consumer \
+ --bootstrap-server kafka:9092 \
+ --topic test \
+ --partition 0 \
+ --property print.key=true \
+ --property print.value=false \
+ --from-beginning \
+ --timeout-ms 1000 2>/dev/null | sort | uniq -c
+ "
+
+ count-1:
+ image: juplo/toolbox
+ command: >
+ bash -c "
+ kafka-console-consumer \
+ --bootstrap-server kafka:9092 \
+ --topic test \
+ --partition 1 \
+ --property print.key=true \
+ --property print.value=false \
+ --from-beginning \
+ --timeout-ms 1000 2>/dev/null | sort | uniq -c
+ "
+
+ count-2:
+ image: juplo/toolbox
+ command: >
+ bash -c "
+ kafka-console-consumer \
+ --bootstrap-server kafka:9092 \
+ --topic test \
+ --partition 2 \
+ --property print.key=true \
+ --property print.value=false \
+ --from-beginning \
+ --timeout-ms 1000 2>/dev/null | sort | uniq -c
+ "
+
+ count-3:
+ image: juplo/toolbox
+ command: >
+ bash -c "
+ kafka-console-consumer \
+ --bootstrap-server kafka:9092 \
+ --topic test \
+ --partition 3 \
+ --property print.key=true \
+ --property print.value=false \
+ --from-beginning \
+ --timeout-ms 1000 2>/dev/null | sort | uniq -c
+ "
+
+ count-4:
+ image: juplo/toolbox
+ command: >
+ bash -c "
+ kafka-console-consumer \
+ --bootstrap-server kafka:9092 \
+ --topic test \
+ --partition 4 \
+ --property print.key=true \
+ --property print.value=false \
+ --from-beginning \
+ --timeout-ms 1000 2>/dev/null | sort | uniq -c
+ "
+
+ count-5:
+ image: juplo/toolbox
+ command: >
+ bash -c "
+ kafka-console-consumer \
+ --bootstrap-server kafka:9092 \
+ --topic test \
+ --partition 5 \
+ --property print.key=true \
+ --property print.value=false \
+ --from-beginning \
+ --timeout-ms 1000 2>/dev/null | sort | uniq -c
+ "
+
+ count-6:
+ image: juplo/toolbox
+ command: >
+ bash -c "
+ kafka-console-consumer \
+ --bootstrap-server kafka:9092 \
+ --topic test \
+ --partition 6 \
+ --property print.key=true \
+ --property print.value=false \
+ --from-beginning \
+ --timeout-ms 1000 2>/dev/null | sort | uniq -c
+ "
+
+ count-7:
+ image: juplo/toolbox
+ command: >
+ bash -c "
+ kafka-console-consumer \
+ --bootstrap-server kafka:9092 \
+ --topic test \
+ --partition 7 \
+ --property print.key=true \
+ --property print.value=false \
+ --from-beginning \
+ --timeout-ms 1000 2>/dev/null | sort | uniq -c
+ "
</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>endless-producer</artifactId>
- <name>Endless Producer: a Simple Producer that endlessly writes numbers into a topic</name>
+ <artifactId>round-robin</artifactId>
+ <name>Round Robin: a version of the Endless Producer, that uses round robin for partitioning</name>
<version>1.0-SNAPSHOT</version>
<dependencies>
props.put("bootstrap.servers", bootstrapServer);
props.put("client.id", clientId);
props.put("acks", acks);
+ props.put("partitioner.class", "org.apache.kafka.clients.producer.RoundRobinPartitioner");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());