From: Kai Moritz Date: Sun, 24 Jul 2022 15:55:17 +0000 (+0200) Subject: Nachtrag für beim Merge von 'deserialization' übersehene Änderung X-Git-Tag: wip-DEPRECATED~5 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=25bc9398e6d81468e9605655e8c36a3caa347401;p=demos%2Fkafka%2Ftraining Nachtrag für beim Merge von 'deserialization' übersehene Änderung * Beim Merge der Änderungen aus 'deserialization' wurde versehentlich die Konfigurations-Option `enable.auto.commit=false` entfernt. * Dadurch hat die Kafka-Clientlibary parallel weitre Commits nach Kafka geschrieben, die eigentlich nicht mehr benötigt werden, weil die Commits ja zusammen mit den Statistiken in der MongoDB gespeichert werden. * Bei der Korrektur ist außerdem aufgefallen, dass in dem Branch 'deserialization' einige `Consumer.commitSync()`-Aufrufe übersehen wurden, die durch das explizite Speichern der Offsets nun auch nicht mehr benötigt werden. --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 542d77d..a6b536b 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -58,6 +58,7 @@ public class ApplicationConfiguration props.put("bootstrap.servers", properties.getBootstrapServer()); props.put("group.id", properties.getGroupId()); props.put("client.id", properties.getClientId()); + props.put("enable.auto.commit", false); props.put("auto.offset.reset", properties.getAutoOffsetReset()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 66fea35..39e7b1b 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -134,7 +134,6 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl catch(WakeupException e) { log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); - consumer.commitSync(); shutdown(); } catch(RecordDeserializationException e) @@ -148,7 +147,6 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl offset, e.getCause().toString()); - consumer.commitSync(); shutdown(e); } catch(Exception e)