From 25bc9398e6d81468e9605655e8c36a3caa347401 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 24 Jul 2022 17:55:17 +0200 Subject: [PATCH] =?utf8?q?Nachtrag=20f=C3=BCr=20beim=20Merge=20von=20'dese?= =?utf8?q?rialization'=20=C3=BCbersehene=20=C3=84nderung?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Beim Merge der Änderungen aus 'deserialization' wurde versehentlich die Konfigurations-Option `enable.auto.commit=false` entfernt. * Dadurch hat die Kafka-Clientlibary parallel weitre Commits nach Kafka geschrieben, die eigentlich nicht mehr benötigt werden, weil die Commits ja zusammen mit den Statistiken in der MongoDB gespeichert werden. * Bei der Korrektur ist außerdem aufgefallen, dass in dem Branch 'deserialization' einige `Consumer.commitSync()`-Aufrufe übersehen wurden, die durch das explizite Speichern der Offsets nun auch nicht mehr benötigt werden. --- src/main/java/de/juplo/kafka/ApplicationConfiguration.java | 1 + src/main/java/de/juplo/kafka/EndlessConsumer.java | 2 -- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 542d77d..a6b536b 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -58,6 +58,7 @@ public class ApplicationConfiguration props.put("bootstrap.servers", properties.getBootstrapServer()); props.put("group.id", properties.getGroupId()); props.put("client.id", properties.getClientId()); + props.put("enable.auto.commit", false); props.put("auto.offset.reset", properties.getAutoOffsetReset()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 66fea35..39e7b1b 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -134,7 +134,6 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl catch(WakeupException e) { log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); - consumer.commitSync(); shutdown(); } catch(RecordDeserializationException e) @@ -148,7 +147,6 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl offset, e.getCause().toString()); - consumer.commitSync(); shutdown(e); } catch(Exception e) -- 2.20.1