echo "Waiting for the Kafka-Cluster to become ready..."
docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
-docker-compose up setup
+docker-compose up -d kafka-ui
+
+docker-compose exec -T cli bash << 'EOF'
+echo "Creating topic with 3 partitions..."
+kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+# tag::createtopic[]
+kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3
+# end::createtopic[]
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+EOF
+
+docker-compose up -d producer-0 producer-1 consumer
+
+while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done
+while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done
+while ! [[ $(http -b :8081/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8081/actuator/health; sleep 1; done
+
+echo foo | http -v :8000/foo
+echo foo | http -v :8001/foo
+
+sleep 5
+
+http -v :8081/seen
+
docker-compose up -d
-sleep 15
+sleep 5
-echo foo | http -v :8080/bar
-dd if=/dev/zero bs=1024 count=1024 | http -v :8080/fehler
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+
+docker-compose exec -T cli bash << 'EOF'
+echo "Altering number of partitions from 3 to 7..."
+kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+EOF
+
+docker-compose restart producer-0 producer-1
+
+while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done
+while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done
+
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
http -v :8081/seen
-docker-compose stop producer consumer
-docker-compose logs producer
+docker-compose stop
depends_on:
- zookeeper
- setup:
- image: juplo/toolbox
- command: >
- bash -c "
- kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
- kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
- "
+ kafka-ui:
+ image: provectuslabs/kafka-ui:0.3.3
+ ports:
+ - 8080:8080
+ environment:
+ KAFKA_CLUSTERS_0_NAME: local
+ KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
cli:
image: juplo/toolbox
command: sleep infinity
- producer:
+ producer-0:
image: juplo/rest-producer:1.0-SNAPSHOT
ports:
- - 8080:8080
+ - 8000:8080
+ environment:
+ producer.bootstrap-server: kafka:9092
+ producer.client-id: producer
+ producer.topic: test
+ producer.partition: 0
+
+ producer-1:
+ image: juplo/rest-producer:1.0-SNAPSHOT
+ ports:
+ - 8001:8080
environment:
producer.bootstrap-server: kafka:9092
producer.client-id: producer
producer.topic: test
+ producer.partition: 1
+
+ peter:
+ image: juplo/rest-client:1.0-SNAPSHOT
+ environment:
+ rest-client.baseUrl: http://producer-1:8080
+ rest-client.username: peter
+ rest-client.throttle-ms: 1000
+
+ klaus:
+ image: juplo/rest-client:1.0-SNAPSHOT
+ environment:
+ rest-client.baseUrl: http://producer-1:8080
+ rest-client.username: klaus
+ rest-client.throttle-ms: 1100
+
+ beate:
+ image: juplo/rest-client:1.0-SNAPSHOT
+ environment:
+ rest-client.baseUrl: http://producer-0:8080
+ rest-client.username: beate
+ rest-client.throttle-ms: 900
+
+ franz:
+ image: juplo/rest-client:1.0-SNAPSHOT
+ environment:
+ rest-client.baseUrl: http://producer-1:8080
+ rest-client.username: franz
+ rest-client.throttle-ms: 800
+
+ uschi:
+ image: juplo/rest-client:1.0-SNAPSHOT
+ environment:
+ rest-client.baseUrl: http://producer-0:8080
+ rest-client.username: uschi
+ rest-client.throttle-ms: 1200
consumer:
image: juplo/counting-consumer:1.0-SNAPSHOT
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
+
@ConfigurationProperties(prefix = "producer")
@Getter
@Setter
private String bootstrapServer;
private String clientId;
private String topic;
+ private Integer partition;
private String acks;
private Integer batchSize;
private Integer lingerMs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import javax.annotation.PreDestroy;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
@Slf4j
{
private final String id;
private final String topic;
+ private final Integer partition;
private final KafkaProducer<String, String> producer;
private long produced = 0;
{
this.id = properties.getClientId();
this.topic = properties.getTopic();
+ this.partition = properties.getPartition();
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
final ProducerRecord<String, String> record = new ProducerRecord<>(
topic, // Topic
+ partition, // Partition
key, // Key
value // Value
);