From a324ca46e9f2df888a62fda37189341ccef806cf Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 14 Dec 2021 21:00:23 +0100 Subject: [PATCH] Transactional Producer: a producer, that uses transactions --- README.sh | 10 ++-- docker-compose.yml | 47 ++++++++++++++++--- pom.xml | 2 +- src/main/java/de/juplo/kafka/Application.java | 1 + .../de/juplo/kafka/ApplicationProperties.java | 1 + .../java/de/juplo/kafka/EndlessProducer.java | 25 +++++++++- src/main/resources/application.yml | 3 +- 7 files changed, 73 insertions(+), 16 deletions(-) diff --git a/README.sh b/README.sh index de08fae..0544297 100755 --- a/README.sh +++ b/README.sh @@ -25,11 +25,7 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec kafka cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 docker-compose up setup -docker-compose up -d producer peter +docker-compose up -d producer sleep 5 -docker-compose stop producer peter -docker-compose exec -T cli bash << 'EOF' -# tag::kcat[] -kafkacat -C -b kafka:9092 -t test -o beginning -f'key: %k, headers: %h, value: %s\n' -e -# end::kcat[] -EOF +docker-compose stop producer +docker-compose logs producer diff --git a/docker-compose.yml b/docker-compose.yml index 6f38efc..669491c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,6 +18,8 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 ports: - 9092:9082 - 9082:9082 @@ -44,14 +46,47 @@ services: producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test - producer.throttle-ms: 200 + producer.throttle-ms: 100 peter: - image: juplo/endless-producer:1.0-SNAPSHOT + image: juplo/endless-consumer:1.0-SNAPSHOT ports: - 8081:8080 environment: - producer.bootstrap-server: kafka:9092 - producer.client-id: peter - producer.topic: test - producer.throttle-ms: 666 + server.port: 8080 + consumer.bootstrap-server: kafka:9092 + consumer.group-id: my-group + consumer.client-id: peter + consumer.topic: test + + beate: + image: juplo/endless-consumer:1.0-SNAPSHOT + ports: + - 8082:8080 + environment: + server.port: 8080 + consumer.bootstrap-server: kafka:9092 + consumer.group-id: my-group + consumer.client-id: beate + consumer.topic: test + + franz: + image: juplo/endless-consumer:1.0-SNAPSHOT + ports: + - 8083:8080 + environment: + server.port: 8080 + consumer.bootstrap-server: kafka:9092 + consumer.group-id: my-group + consumer.client-id: franz + consumer.topic: test + + klaus: + image: juplo/endless-consumer:1.0-SNAPSHOT + ports: + - 8084:8080 + environment: + consumer.bootstrap-server: kafka:9092 + consumer.group-id: my-group + consumer.client-id: klaus + consumer.topic: test diff --git a/pom.xml b/pom.xml index 7028cfd..2e2d64f 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ de.juplo.kafka endless-producer - Endless Producer: a Simple Producer that endlessly writes numbers into a topic + Transactional Producer: a producer, that uses transactions 1.0-SNAPSHOT diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index bc617a8..d38b4ac 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -32,6 +32,7 @@ public class Application properties.getClientId(), properties.getTopic(), properties.getAcks(), + properties.getCommitIntervalMs(), properties.getThrottleMs()); producer.start(); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index ab26890..54b5a32 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -13,5 +13,6 @@ public class ApplicationProperties private String clientId; private String topic; private String acks; + private int commitIntervalMs; private int throttleMs; } diff --git a/src/main/java/de/juplo/kafka/EndlessProducer.java b/src/main/java/de/juplo/kafka/EndlessProducer.java index 1620693..65fb25a 100644 --- a/src/main/java/de/juplo/kafka/EndlessProducer.java +++ b/src/main/java/de/juplo/kafka/EndlessProducer.java @@ -21,12 +21,14 @@ public class EndlessProducer implements Runnable private final String id; private final String topic; private final String acks; + private final int commitIntervalMs; private final int throttleMs; private final KafkaProducer producer; private boolean running = false; private long i = 0; private long produced = 0; + private long lastCommit; private Future future = null; public EndlessProducer( @@ -35,16 +37,19 @@ public class EndlessProducer implements Runnable String clientId, String topic, String acks, + int commitIntervalMs, int throttleMs) { this.executor = executor; this.id = clientId; this.topic = topic; this.acks = acks; + this.commitIntervalMs = commitIntervalMs; this.throttleMs = throttleMs; Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServer); + props.put("transactional.id", clientId); props.put("client.id", clientId); props.put("acks", acks); props.put("key.serializer", StringSerializer.class.getName()); @@ -58,6 +63,12 @@ public class EndlessProducer implements Runnable { try { + producer.initTransactions(); + + lastCommit = System.currentTimeMillis(); + log.info("{} - Beginning transaction", id); + producer.beginTransaction(); + for (; running; i++) { final long time = System.currentTimeMillis(); @@ -114,6 +125,15 @@ public class EndlessProducer implements Runnable now - time ); + if (now - lastCommit >= commitIntervalMs) + { + log.info("{} - Commiting transaction", id); + producer.commitTransaction(); + lastCommit = now; + log.info("{} - Beginning new transaction", id); + producer.beginTransaction(); + } + if (throttleMs > 0) { try @@ -127,11 +147,14 @@ public class EndlessProducer implements Runnable } } + log.info("{} - Commiting transaction", id); + producer.commitTransaction(); log.info("{} - Done", id); } catch (Exception e) { - + log.info("{} - Aborting transaction", id); + producer.abortTransaction(); } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index e4ae52a..b8652df 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,7 +2,8 @@ producer: bootstrap-server: :9092 client-id: peter topic: test - acks: 1 + acks: all + commit-interval-ms: 2000 throttle-ms: 1000 management: endpoints: -- 2.20.1