WIP
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 19:57:32 +0000 (20:57 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 9 Nov 2024 15:49:53 +0000 (16:49 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 73f56a8..eff1aed 100644 (file)
@@ -169,7 +169,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
     }
 
-    producer.commitTransaction();
+    commit();
   }
 
   private void commitIfNecessary()
@@ -177,9 +177,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     Instant now = clock.instant();
     if (!now.isBefore(lastCommit.plus(commitInterval)))
     {
-      producer.sendOffsetsToTransaction(getCurrentOffsets(), consumer.groupMetadata());
-      commit();
-      lastCommit = now;
+      commit(now);
     }
   }
 
@@ -193,6 +191,19 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   }
 
   private void commit()
+  {
+    commit(clock.instant());
+  }
+
+  private void commit(Instant now)
+  {
+    log.info("{} - Sending offsets to transaction and commiting", id);
+    producer.sendOffsetsToTransaction(getCurrentOffsets(), consumer.groupMetadata());
+    commitTx();
+    lastCommit = now;
+  }
+
+  private void commitTx()
   {
     producer.commitTransaction();
     producer.beginTransaction();
@@ -364,6 +375,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   @Override
   public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
+    commit();
     partitions
       .stream()
       .filter(partition -> partition.topic().equals(topic))