Der Rest-Producer schreibt in eine fixe Partition, wenn so konfiguriert
authorKai Moritz <kai@juplo.de>
Sun, 3 Apr 2022 15:15:21 +0000 (17:15 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 3 Apr 2022 16:25:32 +0000 (18:25 +0200)
README.sh
docker-compose.yml
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/RestProducer.java

index 698d6dd..ece13d0 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -24,14 +24,83 @@ fi
 
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
-docker-compose up setup
+docker-compose up -d kafka-ui
+
+docker-compose exec -T cli bash << 'EOF'
+echo "Creating topic with 3 partitions..."
+kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+# tag::createtopic[]
+kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3
+# end::createtopic[]
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+EOF
+
+docker-compose up -d producer-0 producer-1 consumer
+
+while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done
+while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done
+while ! [[ $(http -b :8081/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8081/actuator/health; sleep 1; done
+
+echo foo | http -v :8000/foo
+echo foo | http -v :8001/foo
+
+sleep 5
+
+http -v :8081/seen
+
 docker-compose up -d
 
-sleep 15
+sleep 5
 
-echo foo | http -v :8080/bar
-dd if=/dev/zero bs=1024 count=1024  | http -v :8080/fehler
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+
+docker-compose exec -T cli bash << 'EOF'
+echo "Altering number of partitions from 3 to 7..."
+kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+EOF
+
+docker-compose restart producer-0 producer-1
+
+while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done
+while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done
+
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
 http -v :8081/seen
 
-docker-compose stop producer consumer
-docker-compose logs producer
+docker-compose stop
index 336c459..6993f6c 100644 (file)
@@ -24,26 +24,72 @@ services:
     depends_on:
       - zookeeper
 
-  setup:
-    image: juplo/toolbox
-    command: >
-      bash -c "
-        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
-        kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
-      "
+  kafka-ui:
+    image: provectuslabs/kafka-ui:0.3.3
+    ports:
+      - 8080:8080
+    environment:
+      KAFKA_CLUSTERS_0_NAME: local
+      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
 
   cli:
     image: juplo/toolbox
     command: sleep infinity
 
-  producer:
+  producer-0:
     image: juplo/rest-producer:1.0-SNAPSHOT
     ports:
-      - 8080:8080
+      - 8000:8080
+    environment:
+      producer.bootstrap-server: kafka:9092
+      producer.client-id: producer
+      producer.topic: test
+      producer.partition: 0
+
+  producer-1:
+    image: juplo/rest-producer:1.0-SNAPSHOT
+    ports:
+      - 8001:8080
     environment:
       producer.bootstrap-server: kafka:9092
       producer.client-id: producer
       producer.topic: test
+      producer.partition: 1
+
+  peter:
+    image: juplo/rest-client:1.0-SNAPSHOT
+    environment:
+      rest-client.baseUrl: http://producer-1:8080
+      rest-client.username: peter
+      rest-client.throttle-ms: 1000
+
+  klaus:
+    image: juplo/rest-client:1.0-SNAPSHOT
+    environment:
+      rest-client.baseUrl: http://producer-1:8080
+      rest-client.username: klaus
+      rest-client.throttle-ms: 1100
+
+  beate:
+    image: juplo/rest-client:1.0-SNAPSHOT
+    environment:
+      rest-client.baseUrl: http://producer-0:8080
+      rest-client.username: beate
+      rest-client.throttle-ms: 900
+
+  franz:
+    image: juplo/rest-client:1.0-SNAPSHOT
+    environment:
+      rest-client.baseUrl: http://producer-1:8080
+      rest-client.username: franz
+      rest-client.throttle-ms: 800
+
+  uschi:
+    image: juplo/rest-client:1.0-SNAPSHOT
+    environment:
+      rest-client.baseUrl: http://producer-0:8080
+      rest-client.username: uschi
+      rest-client.throttle-ms: 1200
 
   consumer:
     image: juplo/counting-consumer:1.0-SNAPSHOT
index 1f30262..78b6085 100644 (file)
@@ -4,6 +4,7 @@ import lombok.Getter;
 import lombok.Setter;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
+
 @ConfigurationProperties(prefix = "producer")
 @Getter
 @Setter
@@ -12,6 +13,7 @@ public class ApplicationProperties
   private String bootstrapServer;
   private String clientId;
   private String topic;
+  private Integer partition;
   private String acks;
   private Integer batchSize;
   private Integer lingerMs;
index dea49f0..ac9a541 100644 (file)
@@ -4,14 +4,12 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 
 import javax.annotation.PreDestroy;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 
 
 @Slf4j
@@ -20,6 +18,7 @@ public class RestProducer
 {
   private final String id;
   private final String topic;
+  private final Integer partition;
   private final KafkaProducer<String, String> producer;
 
   private long produced = 0;
@@ -28,6 +27,7 @@ public class RestProducer
   {
     this.id = properties.getClientId();
     this.topic = properties.getTopic();
+    this.partition = properties.getPartition();
 
     Properties props = new Properties();
     props.put("bootstrap.servers", properties.getBootstrapServer());
@@ -55,6 +55,7 @@ public class RestProducer
 
     final ProducerRecord<String, String> record = new ProducerRecord<>(
         topic,  // Topic
+        partition, // Partition
         key,    // Key
         value   // Value
     );