echo "Waiting for the Kafka-Cluster to become ready..."
docker-compose exec kafka cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
docker-compose up setup
-docker-compose up -d producer
+docker-compose up -d producer peter
sleep 5
-docker-compose stop producer
-docker-compose logs producer
+docker-compose stop producer peter
+docker-compose exec -T cli bash << 'EOF'
+# tag::kcat[]
+kafkacat -C -b kafka:9092 -t test -o beginning -f'key: %k, headers: %h, value: %s\n' -e
+# end::kcat[]
+EOF
producer.client-id: producer
producer.topic: test
producer.throttle-ms: 200
+
+ peter:
+ image: juplo/endless-producer:1.0-SNAPSHOT
+ ports:
+ - 8081:8080
+ environment:
+ producer.bootstrap-server: kafka:9092
+ producer.client-id: peter
+ producer.topic: test
+ producer.throttle-ms: 666
import org.apache.kafka.common.serialization.StringSerializer;
import javax.annotation.PreDestroy;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Long.toString(i) // Value
);
+ record
+ .headers()
+ .add("client", id.getBytes())
+ .add("i", BigInteger.valueOf(i).toByteArray()); // Printed as String by kafkacat
+
producer.send(record, (metadata, e) ->
{
long now = System.currentTimeMillis();