echo "Waiting for the Kafka-Cluster to become ready..."
docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
-docker-compose up -d kafka-ui
-
-docker-compose exec -T cli bash << 'EOF'
-echo "Creating topic with 3 partitions..."
-kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
-# tag::createtopic[]
-kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3
-# end::createtopic[]
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
-
-docker-compose up -d producer-0 producer-1 consumer
-
-while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done
-while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done
-while ! [[ $(http -b :8081/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8081/actuator/health; sleep 1; done
-
-echo foo | http -v :8000/foo
-echo foo | http -v :8001/foo
-
-sleep 5
-
-http -v :8081/seen
-
+docker-compose up setup
docker-compose up -d
+while ! [[ $(http -b :8080/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8080/actuator/health; sleep 1; done
-sleep 5
-
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-
-docker-compose exec -T cli bash << 'EOF'
-echo "Altering number of partitions from 3 to 7..."
-kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
-
-docker-compose restart producer-0 producer-1
-
-while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done
-while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done
-
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-
-docker-compose stop
+echo -n bar | http -v :8080/foo
+echo -n foo | http -v :8080/bar X-id:666
+docker-compose exec cli kafkacat -b kafka:9092 -t test -f "%p|%o|%k=%s|%h\n" -e
depends_on:
- zookeeper
- kafka-ui:
- image: provectuslabs/kafka-ui:0.3.3
- ports:
- - 8080:8080
- environment:
- KAFKA_CLUSTERS_0_NAME: local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
+ setup:
+ image: juplo/toolbox
+ command: >
+ bash -c "
+ kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+ kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
+ "
cli:
image: juplo/toolbox
command: sleep infinity
- producer-0:
+ producer:
image: juplo/rest-producer:1.0-SNAPSHOT
ports:
- - 8000:8080
- environment:
- producer.bootstrap-server: kafka:9092
- producer.client-id: producer
- producer.topic: test
- producer.partition: 0
-
- producer-1:
- image: juplo/rest-producer:1.0-SNAPSHOT
- ports:
- - 8001:8080
+ - 8080:8080
environment:
producer.bootstrap-server: kafka:9092
producer.client-id: producer
producer.topic: test
- producer.partition: 1
-
- peter:
- image: juplo/rest-client:1.0-SNAPSHOT
- environment:
- rest-client.baseUrl: http://producer-1:8080
- rest-client.username: peter
- rest-client.throttle-ms: 1000
-
- klaus:
- image: juplo/rest-client:1.0-SNAPSHOT
- environment:
- rest-client.baseUrl: http://producer-1:8080
- rest-client.username: klaus
- rest-client.throttle-ms: 1100
-
- beate:
- image: juplo/rest-client:1.0-SNAPSHOT
- environment:
- rest-client.baseUrl: http://producer-0:8080
- rest-client.username: beate
- rest-client.throttle-ms: 900
-
- franz:
- image: juplo/rest-client:1.0-SNAPSHOT
- environment:
- rest-client.baseUrl: http://producer-1:8080
- rest-client.username: franz
- rest-client.throttle-ms: 800
-
- uschi:
- image: juplo/rest-client:1.0-SNAPSHOT
- environment:
- rest-client.baseUrl: http://producer-0:8080
- rest-client.username: uschi
- rest-client.throttle-ms: 1200
-
- consumer:
- image: juplo/counting-consumer:1.0-SNAPSHOT
- ports:
- - 8081:8081
- environment:
- consumer.bootstrap-server: kafka:9092
- consumer.client-id: my-group
- consumer.client-id: consumer
- consumer.topic: test
import org.springframework.web.context.request.async.DeferredResult;
import javax.annotation.PreDestroy;
+import java.math.BigInteger;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@PostMapping(path = "{key}")
public DeferredResult<ProduceResult> send(
@PathVariable String key,
+ @RequestHeader(name = "X-id", required = false) Long correlationId,
@RequestBody String value)
{
DeferredResult<ProduceResult> result = new DeferredResult<>();
value // Value
);
+ record.headers().add("source", id.getBytes());
+ if (correlationId != null)
+ {
+ record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray());
+ }
+
producer.send(record, (metadata, e) ->
{
long now = System.currentTimeMillis();