From 3647e542f7d34d4e766fb7fbd302a68c4d4468d5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Nov 2024 20:57:32 +0100 Subject: [PATCH] WIP --- .../java/de/juplo/kafka/ExampleConsumer.java | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 73f56a8..eff1aed 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -169,7 +169,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } - producer.commitTransaction(); + commit(); } private void commitIfNecessary() @@ -177,9 +177,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener Instant now = clock.instant(); if (!now.isBefore(lastCommit.plus(commitInterval))) { - producer.sendOffsetsToTransaction(getCurrentOffsets(), consumer.groupMetadata()); - commit(); - lastCommit = now; + commit(now); } } @@ -193,6 +191,19 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener } private void commit() + { + commit(clock.instant()); + } + + private void commit(Instant now) + { + log.info("{} - Sending offsets to transaction and commiting", id); + producer.sendOffsetsToTransaction(getCurrentOffsets(), consumer.groupMetadata()); + commitTx(); + lastCommit = now; + } + + private void commitTx() { producer.commitTransaction(); producer.beginTransaction(); @@ -364,6 +375,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener @Override public synchronized void onPartitionsRevoked(Collection partitions) { + commit(); partitions .stream() .filter(partition -> partition.topic().equals(topic)) -- 2.20.1