WIP
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 20:53:41 +0000 (21:53 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 9 Nov 2024 15:49:53 +0000 (16:49 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index eff1aed..a7e5bcb 100644 (file)
@@ -169,7 +169,9 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
     }
 
-    commit();
+    sendOffsets();
+    log.info("{} - Final commit for transaction", id);
+    producer.commitTransaction();
   }
 
   private void commitIfNecessary()
@@ -197,14 +199,20 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
 
   private void commit(Instant now)
   {
-    log.info("{} - Sending offsets to transaction and commiting", id);
-    producer.sendOffsetsToTransaction(getCurrentOffsets(), consumer.groupMetadata());
+    sendOffsets();
     commitTx();
     lastCommit = now;
   }
 
+  private void sendOffsets()
+  {
+    log.info("{} - Sending offsets to transaction", id);
+    producer.sendOffsetsToTransaction(getCurrentOffsets(), consumer.groupMetadata());
+  }
+
   private void commitTx()
   {
+    log.info("{} - Rolling transaction", id);
     producer.commitTransaction();
     producer.beginTransaction();
   }