From dc838e6bc7a5324176e65f54a530ccc61c412929 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 3 Apr 2022 17:15:21 +0200 Subject: [PATCH] Der Rest-Producer schreibt in eine fixe Partition, wenn so konfiguriert --- README.sh | 81 +++++++++++++++++-- docker-compose.yml | 64 ++++++++++++--- .../de/juplo/kafka/ApplicationProperties.java | 2 + .../java/de/juplo/kafka/RestProducer.java | 5 +- 4 files changed, 135 insertions(+), 17 deletions(-) diff --git a/README.sh b/README.sh index 698d6dd..ece13d0 100755 --- a/README.sh +++ b/README.sh @@ -24,14 +24,83 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 -docker-compose up setup +docker-compose up -d kafka-ui + +docker-compose exec -T cli bash << 'EOF' +echo "Creating topic with 3 partitions..." +kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test +# tag::createtopic[] +kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3 +# end::createtopic[] +kafka-topics --bootstrap-server kafka:9092 --describe --topic test +EOF + +docker-compose up -d producer-0 producer-1 consumer + +while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done +while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done +while ! [[ $(http -b :8081/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8081/actuator/health; sleep 1; done + +echo foo | http -v :8000/foo +echo foo | http -v :8001/foo + +sleep 5 + +http -v :8081/seen + docker-compose up -d -sleep 15 +sleep 5 -echo foo | http -v :8080/bar -dd if=/dev/zero bs=1024 count=1024 | http -v :8080/fehler +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen + +docker-compose exec -T cli bash << 'EOF' +echo "Altering number of partitions from 3 to 7..." +kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7 +kafka-topics --bootstrap-server kafka:9092 --describe --topic test +EOF + +docker-compose restart producer-0 producer-1 + +while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done +while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done + +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 http -v :8081/seen -docker-compose stop producer consumer -docker-compose logs producer +docker-compose stop diff --git a/docker-compose.yml b/docker-compose.yml index 336c459..6993f6c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,26 +24,72 @@ services: depends_on: - zookeeper - setup: - image: juplo/toolbox - command: > - bash -c " - kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test - kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 - " + kafka-ui: + image: provectuslabs/kafka-ui:0.3.3 + ports: + - 8080:8080 + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 cli: image: juplo/toolbox command: sleep infinity - producer: + producer-0: image: juplo/rest-producer:1.0-SNAPSHOT ports: - - 8080:8080 + - 8000:8080 + environment: + producer.bootstrap-server: kafka:9092 + producer.client-id: producer + producer.topic: test + producer.partition: 0 + + producer-1: + image: juplo/rest-producer:1.0-SNAPSHOT + ports: + - 8001:8080 environment: producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test + producer.partition: 1 + + peter: + image: juplo/rest-client:1.0-SNAPSHOT + environment: + rest-client.baseUrl: http://producer-1:8080 + rest-client.username: peter + rest-client.throttle-ms: 1000 + + klaus: + image: juplo/rest-client:1.0-SNAPSHOT + environment: + rest-client.baseUrl: http://producer-1:8080 + rest-client.username: klaus + rest-client.throttle-ms: 1100 + + beate: + image: juplo/rest-client:1.0-SNAPSHOT + environment: + rest-client.baseUrl: http://producer-0:8080 + rest-client.username: beate + rest-client.throttle-ms: 900 + + franz: + image: juplo/rest-client:1.0-SNAPSHOT + environment: + rest-client.baseUrl: http://producer-1:8080 + rest-client.username: franz + rest-client.throttle-ms: 800 + + uschi: + image: juplo/rest-client:1.0-SNAPSHOT + environment: + rest-client.baseUrl: http://producer-0:8080 + rest-client.username: uschi + rest-client.throttle-ms: 1200 consumer: image: juplo/counting-consumer:1.0-SNAPSHOT diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 1f30262..78b6085 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -4,6 +4,7 @@ import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; + @ConfigurationProperties(prefix = "producer") @Getter @Setter @@ -12,6 +13,7 @@ public class ApplicationProperties private String bootstrapServer; private String clientId; private String topic; + private Integer partition; private String acks; private Integer batchSize; private Integer lingerMs; diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java index dea49f0..ac9a541 100644 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ b/src/main/java/de/juplo/kafka/RestProducer.java @@ -4,14 +4,12 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; import javax.annotation.PreDestroy; import java.util.Properties; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; @Slf4j @@ -20,6 +18,7 @@ public class RestProducer { private final String id; private final String topic; + private final Integer partition; private final KafkaProducer producer; private long produced = 0; @@ -28,6 +27,7 @@ public class RestProducer { this.id = properties.getClientId(); this.topic = properties.getTopic(); + this.partition = properties.getPartition(); Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -55,6 +55,7 @@ public class RestProducer final ProducerRecord record = new ProducerRecord<>( topic, // Topic + partition, // Partition key, // Key value // Value ); -- 2.20.1