Transactional Producer: a producer, that uses transactions transactional-producer--alt
authorKai Moritz <kai@juplo.de>
Tue, 14 Dec 2021 20:00:23 +0000 (21:00 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 14 Dec 2021 20:00:23 +0000 (21:00 +0100)
README.sh
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/EndlessProducer.java
src/main/resources/application.yml

index de08fae..0544297 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -25,11 +25,7 @@ fi
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec kafka cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
 docker-compose up setup
-docker-compose up -d producer peter
+docker-compose up -d producer
 sleep 5
-docker-compose stop producer peter
-docker-compose exec -T cli bash << 'EOF'
-# tag::kcat[]
-kafkacat -C -b kafka:9092 -t test -o beginning -f'key: %k, headers: %h, value: %s\n' -e
-# end::kcat[]
-EOF
+docker-compose stop producer
+docker-compose logs producer
index 6f38efc..669491c 100644 (file)
@@ -18,6 +18,8 @@ services:
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
       KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
     ports:
       - 9092:9082
       - 9082:9082
@@ -44,14 +46,47 @@ services:
       producer.bootstrap-server: kafka:9092
       producer.client-id: producer
       producer.topic: test
-      producer.throttle-ms: 200
+      producer.throttle-ms: 100
 
   peter:
-    image: juplo/endless-producer:1.0-SNAPSHOT
+    image: juplo/endless-consumer:1.0-SNAPSHOT
     ports:
       - 8081:8080
     environment:
-      producer.bootstrap-server: kafka:9092
-      producer.client-id: peter
-      producer.topic: test
-      producer.throttle-ms: 666
+      server.port: 8080
+      consumer.bootstrap-server: kafka:9092
+      consumer.group-id: my-group
+      consumer.client-id: peter
+      consumer.topic: test
+
+  beate:
+    image: juplo/endless-consumer:1.0-SNAPSHOT
+    ports:
+      - 8082:8080
+    environment:
+      server.port: 8080
+      consumer.bootstrap-server: kafka:9092
+      consumer.group-id: my-group
+      consumer.client-id: beate
+      consumer.topic: test
+
+  franz:
+    image: juplo/endless-consumer:1.0-SNAPSHOT
+    ports:
+      - 8083:8080
+    environment:
+      server.port: 8080
+      consumer.bootstrap-server: kafka:9092
+      consumer.group-id: my-group
+      consumer.client-id: franz
+      consumer.topic: test
+
+  klaus:
+    image: juplo/endless-consumer:1.0-SNAPSHOT
+    ports:
+      - 8084:8080
+    environment:
+      consumer.bootstrap-server: kafka:9092
+      consumer.group-id: my-group
+      consumer.client-id: klaus
+      consumer.topic: test
diff --git a/pom.xml b/pom.xml
index 7028cfd..2e2d64f 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -13,7 +13,7 @@
 
   <groupId>de.juplo.kafka</groupId>
   <artifactId>endless-producer</artifactId>
-  <name>Endless Producer: a Simple Producer that endlessly writes numbers into a topic</name>
+  <name>Transactional Producer: a producer, that uses transactions</name>
   <version>1.0-SNAPSHOT</version>
 
   <dependencies>
index bc617a8..d38b4ac 100644 (file)
@@ -32,6 +32,7 @@ public class Application
             properties.getClientId(),
             properties.getTopic(),
             properties.getAcks(),
+            properties.getCommitIntervalMs(),
             properties.getThrottleMs());
 
     producer.start();
index ab26890..54b5a32 100644 (file)
@@ -13,5 +13,6 @@ public class ApplicationProperties
   private String clientId;
   private String topic;
   private String acks;
+  private int commitIntervalMs;
   private int throttleMs;
 }
index 1620693..65fb25a 100644 (file)
@@ -21,12 +21,14 @@ public class EndlessProducer implements Runnable
   private final String id;
   private final String topic;
   private final String acks;
+  private final int commitIntervalMs;
   private final int throttleMs;
   private final KafkaProducer<String, String> producer;
 
   private boolean running = false;
   private long i = 0;
   private long produced = 0;
+  private long lastCommit;
   private Future<?> future = null;
 
   public EndlessProducer(
@@ -35,16 +37,19 @@ public class EndlessProducer implements Runnable
       String clientId,
       String topic,
       String acks,
+      int commitIntervalMs,
       int throttleMs)
   {
     this.executor = executor;
     this.id = clientId;
     this.topic = topic;
     this.acks = acks;
+    this.commitIntervalMs = commitIntervalMs;
     this.throttleMs = throttleMs;
 
     Properties props = new Properties();
     props.put("bootstrap.servers", bootstrapServer);
+    props.put("transactional.id", clientId);
     props.put("client.id", clientId);
     props.put("acks", acks);
     props.put("key.serializer", StringSerializer.class.getName());
@@ -58,6 +63,12 @@ public class EndlessProducer implements Runnable
   {
     try
     {
+      producer.initTransactions();
+
+      lastCommit = System.currentTimeMillis();
+      log.info("{} - Beginning transaction", id);
+      producer.beginTransaction();
+
       for (; running; i++)
       {
         final long time = System.currentTimeMillis();
@@ -114,6 +125,15 @@ public class EndlessProducer implements Runnable
             now - time
         );
 
+        if (now - lastCommit >= commitIntervalMs)
+        {
+          log.info("{} - Commiting transaction", id);
+          producer.commitTransaction();
+          lastCommit = now;
+          log.info("{} - Beginning new transaction", id);
+          producer.beginTransaction();
+        }
+
         if (throttleMs > 0)
         {
           try
@@ -127,11 +147,14 @@ public class EndlessProducer implements Runnable
         }
       }
 
+      log.info("{} - Commiting transaction", id);
+      producer.commitTransaction();
       log.info("{} - Done", id);
     }
     catch (Exception e)
     {
-
+      log.info("{} - Aborting transaction", id);
+      producer.abortTransaction();
     }
   }
 
index e4ae52a..b8652df 100644 (file)
@@ -2,7 +2,8 @@ producer:
   bootstrap-server: :9092
   client-id: peter
   topic: test
-  acks: 1
+  acks: all
+  commit-interval-ms: 2000
   throttle-ms: 1000
 management:
   endpoints: