echo "Waiting for the Kafka-Cluster to become ready..."
docker-compose exec kafka cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
docker-compose up setup
-docker-compose up -d producer peter
+docker-compose up -d producer
sleep 5
-docker-compose stop producer peter
-docker-compose exec -T cli bash << 'EOF'
-# tag::kcat[]
-kafkacat -C -b kafka:9092 -t test -o beginning -f'key: %k, headers: %h, value: %s\n' -e
-# end::kcat[]
-EOF
+docker-compose stop producer
+docker-compose logs producer
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+ KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+ KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
ports:
- 9092:9082
- 9082:9082
producer.bootstrap-server: kafka:9092
producer.client-id: producer
producer.topic: test
- producer.throttle-ms: 200
+ producer.throttle-ms: 100
peter:
- image: juplo/endless-producer:1.0-SNAPSHOT
+ image: juplo/endless-consumer:1.0-SNAPSHOT
ports:
- 8081:8080
environment:
- producer.bootstrap-server: kafka:9092
- producer.client-id: peter
- producer.topic: test
- producer.throttle-ms: 666
+ server.port: 8080
+ consumer.bootstrap-server: kafka:9092
+ consumer.group-id: my-group
+ consumer.client-id: peter
+ consumer.topic: test
+
+ beate:
+ image: juplo/endless-consumer:1.0-SNAPSHOT
+ ports:
+ - 8082:8080
+ environment:
+ server.port: 8080
+ consumer.bootstrap-server: kafka:9092
+ consumer.group-id: my-group
+ consumer.client-id: beate
+ consumer.topic: test
+
+ franz:
+ image: juplo/endless-consumer:1.0-SNAPSHOT
+ ports:
+ - 8083:8080
+ environment:
+ server.port: 8080
+ consumer.bootstrap-server: kafka:9092
+ consumer.group-id: my-group
+ consumer.client-id: franz
+ consumer.topic: test
+
+ klaus:
+ image: juplo/endless-consumer:1.0-SNAPSHOT
+ ports:
+ - 8084:8080
+ environment:
+ consumer.bootstrap-server: kafka:9092
+ consumer.group-id: my-group
+ consumer.client-id: klaus
+ consumer.topic: test
<groupId>de.juplo.kafka</groupId>
<artifactId>endless-producer</artifactId>
- <name>Endless Producer: a Simple Producer that endlessly writes numbers into a topic</name>
+ <name>Transactional Producer: a producer, that uses transactions</name>
<version>1.0-SNAPSHOT</version>
<dependencies>
properties.getClientId(),
properties.getTopic(),
properties.getAcks(),
+ properties.getCommitIntervalMs(),
properties.getThrottleMs());
producer.start();
private String clientId;
private String topic;
private String acks;
+ private int commitIntervalMs;
private int throttleMs;
}
private final String id;
private final String topic;
private final String acks;
+ private final int commitIntervalMs;
private final int throttleMs;
private final KafkaProducer<String, String> producer;
private boolean running = false;
private long i = 0;
private long produced = 0;
+ private long lastCommit;
private Future<?> future = null;
public EndlessProducer(
String clientId,
String topic,
String acks,
+ int commitIntervalMs,
int throttleMs)
{
this.executor = executor;
this.id = clientId;
this.topic = topic;
this.acks = acks;
+ this.commitIntervalMs = commitIntervalMs;
this.throttleMs = throttleMs;
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServer);
+ props.put("transactional.id", clientId);
props.put("client.id", clientId);
props.put("acks", acks);
props.put("key.serializer", StringSerializer.class.getName());
{
try
{
+ producer.initTransactions();
+
+ lastCommit = System.currentTimeMillis();
+ log.info("{} - Beginning transaction", id);
+ producer.beginTransaction();
+
for (; running; i++)
{
final long time = System.currentTimeMillis();
now - time
);
+ if (now - lastCommit >= commitIntervalMs)
+ {
+ log.info("{} - Commiting transaction", id);
+ producer.commitTransaction();
+ lastCommit = now;
+ log.info("{} - Beginning new transaction", id);
+ producer.beginTransaction();
+ }
+
if (throttleMs > 0)
{
try
}
}
+ log.info("{} - Commiting transaction", id);
+ producer.commitTransaction();
log.info("{} - Done", id);
}
catch (Exception e)
{
-
+ log.info("{} - Aborting transaction", id);
+ producer.abortTransaction();
}
}
bootstrap-server: :9092
client-id: peter
topic: test
- acks: 1
+ acks: all
+ commit-interval-ms: 2000
throttle-ms: 1000
management:
endpoints: