From dd8f0914b63092386b81422b03547f4ad628c444 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 3 Jun 2023 12:28:28 +0200 Subject: [PATCH] WIP --- .../java/de/juplo/kafka/EndlessConsumer.java | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 771b3a2..935e6d4 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -77,17 +77,27 @@ public class EndlessConsumer implements Runnable offsetRecorder.recordOffset(record.partition(), record.offset()); handler.accept(record); + consumed++; + } - long now = clock.millis(); - if (now - lastCommit >= autoCommitIntervalMs) + long now = clock.millis(); + if (now - lastCommit >= autoCommitIntervalMs) + { + Map pendingOffsets = offsetRecorder.getAndClearOffsets(); + if (pendingOffsets.isEmpty()) { + log.info("No pending offsets to commit after {}", Duration.ofMillis(now - lastCommit)); + } + else + { + log.info("Committing pending offsets after {}", Duration.ofMillis(now - lastCommit)); producer.sendOffsetsToTransaction( - offsetRecorder.getOffsets(), + pendingOffsets, consumer.groupMetadata()); producer.commitTransaction(); + lastCommit = now; producer.beginTransaction(); } - consumed++; } } } @@ -95,7 +105,7 @@ public class EndlessConsumer implements Runnable { log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); producer.sendOffsetsToTransaction( - offsetRecorder.getOffsets(), + offsetRecorder.getAndClearOffsets(), consumer.groupMetadata()); producer.commitTransaction(); shutdown(); @@ -112,7 +122,7 @@ public class EndlessConsumer implements Runnable e.getCause().toString()); producer.sendOffsetsToTransaction( - offsetRecorder.getOffsets(), + offsetRecorder.getAndClearOffsets(), consumer.groupMetadata()); producer.commitTransaction(); shutdown(e); @@ -157,7 +167,7 @@ public class EndlessConsumer implements Runnable offsets[partition] = offset + 1; } - Map getOffsets() + Map getAndClearOffsets() { Map recordedOffsets = new HashMap<>(); @@ -165,10 +175,11 @@ public class EndlessConsumer implements Runnable { if (activePartitions[i] && offsets[i] > -1) { - log.info("Offset for partition {} is {}", i, offsets[i]); + log.info("Offset {} of partition {} is pending for a commit", offsets[i], i); recordedOffsets.put( new TopicPartition(topic, i), new OffsetAndMetadata(offsets[i])); + offsets[i] = -1; } } -- 2.20.1