From 1b4dfd3e4e9a146fb24604208727cf1894128346 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Nov 2024 21:53:41 +0100 Subject: [PATCH] WIP --- src/main/java/de/juplo/kafka/ExampleConsumer.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index eff1aed..a7e5bcb 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -169,7 +169,9 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } - commit(); + sendOffsets(); + log.info("{} - Final commit for transaction", id); + producer.commitTransaction(); } private void commitIfNecessary() @@ -197,14 +199,20 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private void commit(Instant now) { - log.info("{} - Sending offsets to transaction and commiting", id); - producer.sendOffsetsToTransaction(getCurrentOffsets(), consumer.groupMetadata()); + sendOffsets(); commitTx(); lastCommit = now; } + private void sendOffsets() + { + log.info("{} - Sending offsets to transaction", id); + producer.sendOffsetsToTransaction(getCurrentOffsets(), consumer.groupMetadata()); + } + private void commitTx() { + log.info("{} - Rolling transaction", id); producer.commitTransaction(); producer.beginTransaction(); } -- 2.20.1