EndlessProducer arbeitet jetzt transaktional endless-stream-producer--transactional
authorKai Moritz <kai@juplo.de>
Tue, 6 Jun 2023 21:09:29 +0000 (23:09 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 6 Jun 2023 21:56:04 +0000 (23:56 +0200)
README.sh
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/EndlessProducer.java
src/main/resources/application.yml

index 73ceebc..71877bd 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/endless-producer:1.0-SNAPSHOT
+IMAGE=juplo/transactional-producer:1.0-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -26,6 +26,6 @@ echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
 docker-compose up setup
 docker-compose up -d producer
-sleep 5
+sleep 15
 docker-compose stop producer
 docker-compose logs producer
index a368379..aff2b24 100644 (file)
@@ -18,6 +18,8 @@ services:
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
       KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
     ports:
       - 9092:9082
       - 9082:9082
diff --git a/pom.xml b/pom.xml
index af4a89f..21526ac 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -12,8 +12,8 @@
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>endless-producer</artifactId>
-  <name>Endless Producer: a Simple Producer that endlessly writes numbers into a topic</name>
+  <artifactId>transactional-producer</artifactId>
+  <name>Endless Producer: a Simple Producer that endlessly writes transactionally grouped batches of numbers into a topic</name>
   <version>1.0-SNAPSHOT</version>
 
   <dependencies>
index bc617a8..d38b4ac 100644 (file)
@@ -32,6 +32,7 @@ public class Application
             properties.getClientId(),
             properties.getTopic(),
             properties.getAcks(),
+            properties.getCommitIntervalMs(),
             properties.getThrottleMs());
 
     producer.start();
index ab26890..54b5a32 100644 (file)
@@ -13,5 +13,6 @@ public class ApplicationProperties
   private String clientId;
   private String topic;
   private String acks;
+  private int commitIntervalMs;
   private int throttleMs;
 }
index 7a5b324..a31652b 100644 (file)
@@ -17,12 +17,14 @@ public class EndlessProducer implements Runnable
   private final ExecutorService executor;
   private final String id;
   private final String topic;
+  private final int commitIntervalMs;
   private final int throttleMs;
   private final KafkaProducer<String, String> producer;
 
   private boolean running = false;
   private long i = 0;
   private long produced = 0;
+  private long lastCommit;
 
   public EndlessProducer(
       ExecutorService executor,
@@ -30,15 +32,18 @@ public class EndlessProducer implements Runnable
       String clientId,
       String topic,
       String acks,
+      int commitIntervalMs,
       int throttleMs)
   {
     this.executor = executor;
     this.id = clientId;
     this.topic = topic;
+    this.commitIntervalMs = commitIntervalMs;
     this.throttleMs = throttleMs;
 
     Properties props = new Properties();
     props.put("bootstrap.servers", bootstrapServer);
+    props.put("transactional.id", clientId);
     props.put("client.id", clientId);
     props.put("acks", acks);
     props.put("key.serializer", StringSerializer.class.getName());
@@ -52,6 +57,12 @@ public class EndlessProducer implements Runnable
   {
     try
     {
+      producer.initTransactions();
+
+      lastCommit = System.currentTimeMillis();
+      log.info("{} - Beginning transaction", id);
+      producer.beginTransaction();
+
       for (; running; i++)
       {
         send(Long.toString(i%10), Long.toString(i));
@@ -67,13 +78,27 @@ public class EndlessProducer implements Runnable
             log.warn("{} - Interrupted while throttling!", e);
           }
         }
+
+        long now = System.currentTimeMillis();
+        if (now - lastCommit >= commitIntervalMs)
+        {
+          log.info("{} - Commiting transaction", id);
+          producer.commitTransaction();
+          lastCommit = now;
+          log.info("{} - Beginning new transaction", id);
+          producer.beginTransaction();
+        }
       }
 
+      log.info("{} - Commiting transaction", id);
+      producer.commitTransaction();
       log.info("{} - Done", id);
     }
     catch (Exception e)
     {
       log.error("{} - Unexpected Exception:", id, e);
+      log.info("{} - Aborting transaction", id);
+      producer.abortTransaction();
     }
     finally
     {
index 7dd385b..e6d3f63 100644 (file)
@@ -2,7 +2,8 @@ producer:
   bootstrap-server: :9092
   client-id: DEV
   topic: test
-  acks: 1
+  acks: all
+  commit-interval-ms: 2000
   throttle-ms: 1000
 management:
   endpoint: