From: Kai Moritz Date: Sat, 23 Jul 2022 10:48:22 +0000 (+0200) Subject: Merge der überarbeiteten Compose-Konfiguration ('setup-partitionierung') X-Git-Tag: round-robin-DEPRECATED X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=053b38adcd1607bf4ddac5bd7039bd05dd3f55c7;p=demos%2Fkafka%2Ftraining Merge der überarbeiteten Compose-Konfiguration ('setup-partitionierung') * Diese Übung war zuvor noch nicht auf den letzten Stand (gemeinsames Basis-Setup 'setup-partitionierung' angehoben worden. * Hintergrund: `RoundRobinPartitioner` ist veraltet! --- 053b38adcd1607bf4ddac5bd7039bd05dd3f55c7 diff --cc README.sh index 9190bf0,3ec2781..f00b917 --- a/README.sh +++ b/README.sh @@@ -35,12 -35,16 +35,16 @@@ kafka-topics --bootstrap-server kafka:9 kafka-topics --bootstrap-server kafka:9092 --describe --topic test EOF - docker-compose up -d producer + docker-compose up -d producer consumer + sleep 10 - docker-compose stop producer - docker-compose up count-0 - docker-compose up count-1 - docker-compose up count-2 -http :8081/seen ++http -v :8081/seen + sleep 1 -http :8081/seen ++http -v :8081/seen + sleep 1 -http :8081/seen ++http -v :8081/seen + sleep 1 -http :8081/seen ++http -v :8081/seen docker-compose exec -T cli bash << 'EOF' echo "Altering number of partitions from 3 to 7..." @@@ -50,35 -54,37 +54,37 @@@ kafka-topics --bootstrap-server kafka:9 # end::altertopic[] EOF - docker-compose up -d producer - sleep 10 - docker-compose stop producer - docker-compose up count-0 - docker-compose up count-1 - docker-compose up count-2 - docker-compose up count-3 - docker-compose up count-4 - docker-compose up count-5 - docker-compose up count-6 + sleep 1 -http :8081/seen ++http -v :8081/seen + sleep 1 -http :8081/seen ++http -v :8081/seen + sleep 1 -http :8081/seen ++http -v :8081/seen + sleep 1 -http :8081/seen ++http -v :8081/seen + sleep 1 -http :8081/seen ++http -v :8081/seen + sleep 1 -http :8081/seen ++http -v :8081/seen - docker-compose exec -T cli bash << 'EOF' - echo "Recreating topic with 7 partitions..." - # tag::recreatetopic[] - kafka-topics --bootstrap-server kafka:9092 --delete --topic test - kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 7 - kafka-topics --bootstrap-server kafka:9092 --describe --topic test - # end::recreatetopic[] - EOF -http post :8081/stop -http post :8081/start ++http -v post :8081/stop ++http -v post :8081/start - docker-compose up -d producer - sleep 10 - docker-compose stop producer - docker-compose up count-0 - docker-compose up count-1 - docker-compose up count-2 - docker-compose up count-3 - docker-compose up count-4 - docker-compose up count-5 - docker-compose up count-6 + sleep 1 -http :8081/seen ++http -v :8081/seen + sleep 1 -http :8081/seen ++http -v :8081/seen + sleep 1 -http :8081/seen ++http -v :8081/seen + sleep 1 -http :8081/seen ++http -v :8081/seen + sleep 1 -http :8081/seen ++http -v :8081/seen + sleep 1 -http :8081/seen ++http -v :8081/seen + sleep 1 -http :8081/seen ++http -v :8081/seen + sleep 1 -http :8081/seen ++http -v :8081/seen - docker-compose stop producer + docker-compose stop producer consumer diff --cc docker-compose.yml index c2c9bd6,de99240..4ace0e1 --- a/docker-compose.yml +++ b/docker-compose.yml @@@ -46,114 -47,14 +47,14 @@@ services producer.topic: test producer.throttle-ms: 10 - count-0: - image: juplo/toolbox - command: > - bash -c " - kafka-console-consumer \ - --bootstrap-server kafka:9092 \ - --topic test \ - --partition 0 \ - --property print.key=true \ - --property print.value=false \ - --from-beginning \ - --timeout-ms 1000 2>/dev/null | sort | uniq -c - " - - count-1: - image: juplo/toolbox - command: > - bash -c " - kafka-console-consumer \ - --bootstrap-server kafka:9092 \ - --topic test \ - --partition 1 \ - --property print.key=true \ - --property print.value=false \ - --from-beginning \ - --timeout-ms 1000 2>/dev/null | sort | uniq -c - " - - count-2: - image: juplo/toolbox - command: > - bash -c " - kafka-console-consumer \ - --bootstrap-server kafka:9092 \ - --topic test \ - --partition 2 \ - --property print.key=true \ - --property print.value=false \ - --from-beginning \ - --timeout-ms 1000 2>/dev/null | sort | uniq -c - " - - count-3: - image: juplo/toolbox - command: > - bash -c " - kafka-console-consumer \ - --bootstrap-server kafka:9092 \ - --topic test \ - --partition 3 \ - --property print.key=true \ - --property print.value=false \ - --from-beginning \ - --timeout-ms 1000 2>/dev/null | sort | uniq -c - " - - count-4: - image: juplo/toolbox - command: > - bash -c " - kafka-console-consumer \ - --bootstrap-server kafka:9092 \ - --topic test \ - --partition 4 \ - --property print.key=true \ - --property print.value=false \ - --from-beginning \ - --timeout-ms 1000 2>/dev/null | sort | uniq -c - " - - count-5: - image: juplo/toolbox - command: > - bash -c " - kafka-console-consumer \ - --bootstrap-server kafka:9092 \ - --topic test \ - --partition 5 \ - --property print.key=true \ - --property print.value=false \ - --from-beginning \ - --timeout-ms 1000 2>/dev/null | sort | uniq -c - " - - count-6: - image: juplo/toolbox - command: > - bash -c " - kafka-console-consumer \ - --bootstrap-server kafka:9092 \ - --topic test \ - --partition 6 \ - --property print.key=true \ - --property print.value=false \ - --from-beginning \ - --timeout-ms 1000 2>/dev/null | sort | uniq -c - " - count-7: - image: juplo/toolbox - command: > - bash -c " - kafka-console-consumer \ - --bootstrap-server kafka:9092 \ - --topic test \ - --partition 7 \ - --property print.key=true \ - --property print.value=false \ - --from-beginning \ - --timeout-ms 1000 2>/dev/null | sort | uniq -c - " + consumer: - image: juplo/counting-consumer:1.0-SNAPSHOT ++ image: juplo/endless-consumer:1.0-SNAPSHOT + ports: + - 8081:8080 + environment: + server.port: 8080 + consumer.bootstrap-server: kafka:9092 + consumer.client-id: my-group + consumer.client-id: consumer + consumer.topic: test diff --cc src/main/java/de/juplo/kafka/EndlessProducer.java index 6b42954,8b3743d..fcf42de --- a/src/main/java/de/juplo/kafka/EndlessProducer.java +++ b/src/main/java/de/juplo/kafka/EndlessProducer.java @@@ -45,7 -41,7 +41,8 @@@ public class EndlessProducer implement props.put("bootstrap.servers", bootstrapServer); props.put("client.id", clientId); props.put("acks", acks); + props.put("metadata.max.age.ms", "1000"); + props.put("partitioner.class", "org.apache.kafka.clients.producer.RoundRobinPartitioner"); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName());