WIP
authorKai Moritz <kai@juplo.de>
Sat, 3 Jun 2023 10:28:28 +0000 (12:28 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 17 Jun 2023 12:43:14 +0000 (14:43 +0200)
src/main/java/de/juplo/kafka/EndlessConsumer.java

index 771b3a2..935e6d4 100644 (file)
@@ -77,17 +77,27 @@ public class EndlessConsumer<K, V> implements Runnable
 
           offsetRecorder.recordOffset(record.partition(), record.offset());
           handler.accept(record);
+          consumed++;
+        }
 
-          long now = clock.millis();
-          if (now - lastCommit >= autoCommitIntervalMs)
+        long now = clock.millis();
+        if (now - lastCommit >= autoCommitIntervalMs)
+        {
+          Map<TopicPartition, OffsetAndMetadata> pendingOffsets = offsetRecorder.getAndClearOffsets();
+          if (pendingOffsets.isEmpty())
           {
+            log.info("No pending offsets to commit after {}", Duration.ofMillis(now - lastCommit));
+          }
+          else
+          {
+            log.info("Committing pending offsets after {}", Duration.ofMillis(now - lastCommit));
             producer.sendOffsetsToTransaction(
-                offsetRecorder.getOffsets(),
+                pendingOffsets,
                 consumer.groupMetadata());
             producer.commitTransaction();
+            lastCommit = now;
             producer.beginTransaction();
           }
-          consumed++;
         }
       }
     }
@@ -95,7 +105,7 @@ public class EndlessConsumer<K, V> implements Runnable
     {
       log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
       producer.sendOffsetsToTransaction(
-          offsetRecorder.getOffsets(),
+          offsetRecorder.getAndClearOffsets(),
           consumer.groupMetadata());
       producer.commitTransaction();
       shutdown();
@@ -112,7 +122,7 @@ public class EndlessConsumer<K, V> implements Runnable
           e.getCause().toString());
 
       producer.sendOffsetsToTransaction(
-          offsetRecorder.getOffsets(),
+          offsetRecorder.getAndClearOffsets(),
           consumer.groupMetadata());
       producer.commitTransaction();
       shutdown(e);
@@ -157,7 +167,7 @@ public class EndlessConsumer<K, V> implements Runnable
       offsets[partition] = offset + 1;
     }
 
-    Map<TopicPartition, OffsetAndMetadata> getOffsets()
+    Map<TopicPartition, OffsetAndMetadata> getAndClearOffsets()
     {
       Map<TopicPartition, OffsetAndMetadata> recordedOffsets = new HashMap<>();
 
@@ -165,10 +175,11 @@ public class EndlessConsumer<K, V> implements Runnable
       {
         if (activePartitions[i] && offsets[i] > -1)
         {
-          log.info("Offset for partition {} is {}", i, offsets[i]);
+          log.info("Offset {} of partition {} is pending for a commit", offsets[i], i);
           recordedOffsets.put(
               new TopicPartition(topic, i),
               new OffsetAndMetadata(offsets[i]));
+          offsets[i] = -1;
         }
       }