From 8dfb868399595061e62bc929e0bf98e3420d0086 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 6 Jun 2023 23:09:29 +0200 Subject: [PATCH] EndlessProducer arbeitet jetzt transaktional --- README.sh | 4 +-- docker-compose.yml | 2 ++ pom.xml | 4 +-- src/main/java/de/juplo/kafka/Application.java | 1 + .../de/juplo/kafka/ApplicationProperties.java | 1 + .../java/de/juplo/kafka/EndlessProducer.java | 25 +++++++++++++++++++ src/main/resources/application.yml | 3 ++- 7 files changed, 35 insertions(+), 5 deletions(-) diff --git a/README.sh b/README.sh index 73ceebc..71877bd 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/endless-producer:1.0-SNAPSHOT +IMAGE=juplo/transactional-producer:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -26,6 +26,6 @@ echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 docker-compose up setup docker-compose up -d producer -sleep 5 +sleep 15 docker-compose stop producer docker-compose logs producer diff --git a/docker-compose.yml b/docker-compose.yml index a368379..aff2b24 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,6 +18,8 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 ports: - 9092:9082 - 9082:9082 diff --git a/pom.xml b/pom.xml index af4a89f..21526ac 100644 --- a/pom.xml +++ b/pom.xml @@ -12,8 +12,8 @@ de.juplo.kafka - endless-producer - Endless Producer: a Simple Producer that endlessly writes numbers into a topic + transactional-producer + Endless Producer: a Simple Producer that endlessly writes transactionally grouped batches of numbers into a topic 1.0-SNAPSHOT diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index bc617a8..d38b4ac 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -32,6 +32,7 @@ public class Application properties.getClientId(), properties.getTopic(), properties.getAcks(), + properties.getCommitIntervalMs(), properties.getThrottleMs()); producer.start(); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index ab26890..54b5a32 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -13,5 +13,6 @@ public class ApplicationProperties private String clientId; private String topic; private String acks; + private int commitIntervalMs; private int throttleMs; } diff --git a/src/main/java/de/juplo/kafka/EndlessProducer.java b/src/main/java/de/juplo/kafka/EndlessProducer.java index 7a5b324..a31652b 100644 --- a/src/main/java/de/juplo/kafka/EndlessProducer.java +++ b/src/main/java/de/juplo/kafka/EndlessProducer.java @@ -17,12 +17,14 @@ public class EndlessProducer implements Runnable private final ExecutorService executor; private final String id; private final String topic; + private final int commitIntervalMs; private final int throttleMs; private final KafkaProducer producer; private boolean running = false; private long i = 0; private long produced = 0; + private long lastCommit; public EndlessProducer( ExecutorService executor, @@ -30,15 +32,18 @@ public class EndlessProducer implements Runnable String clientId, String topic, String acks, + int commitIntervalMs, int throttleMs) { this.executor = executor; this.id = clientId; this.topic = topic; + this.commitIntervalMs = commitIntervalMs; this.throttleMs = throttleMs; Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServer); + props.put("transactional.id", clientId); props.put("client.id", clientId); props.put("acks", acks); props.put("key.serializer", StringSerializer.class.getName()); @@ -52,6 +57,12 @@ public class EndlessProducer implements Runnable { try { + producer.initTransactions(); + + lastCommit = System.currentTimeMillis(); + log.info("{} - Beginning transaction", id); + producer.beginTransaction(); + for (; running; i++) { send(Long.toString(i%10), Long.toString(i)); @@ -67,13 +78,27 @@ public class EndlessProducer implements Runnable log.warn("{} - Interrupted while throttling!", e); } } + + long now = System.currentTimeMillis(); + if (now - lastCommit >= commitIntervalMs) + { + log.info("{} - Commiting transaction", id); + producer.commitTransaction(); + lastCommit = now; + log.info("{} - Beginning new transaction", id); + producer.beginTransaction(); + } } + log.info("{} - Commiting transaction", id); + producer.commitTransaction(); log.info("{} - Done", id); } catch (Exception e) { log.error("{} - Unexpected Exception:", id, e); + log.info("{} - Aborting transaction", id); + producer.abortTransaction(); } finally { diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7dd385b..e6d3f63 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,7 +2,8 @@ producer: bootstrap-server: :9092 client-id: DEV topic: test - acks: 1 + acks: all + commit-interval-ms: 2000 throttle-ms: 1000 management: endpoint: -- 2.20.1