Nachtrag für beim Merge von 'deserialization' übersehene Änderung
authorKai Moritz <kai@juplo.de>
Sun, 24 Jul 2022 15:55:17 +0000 (17:55 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 24 Jul 2022 18:30:43 +0000 (20:30 +0200)
* Beim Merge der Änderungen aus 'deserialization' wurde versehentlich
  die Konfigurations-Option `enable.auto.commit=false` entfernt.
* Dadurch hat die Kafka-Clientlibary parallel weitre Commits nach Kafka
  geschrieben, die eigentlich nicht mehr benötigt werden, weil die
  Commits ja zusammen mit den Statistiken in der MongoDB gespeichert werden.
* Bei der Korrektur ist außerdem aufgefallen, dass in dem Branch
  'deserialization' einige `Consumer.commitSync()`-Aufrufe übersehen wurden,
  die durch das explizite Speichern der Offsets nun auch nicht mehr benötigt
  werden.

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/EndlessConsumer.java

index 542d77d..a6b536b 100644 (file)
@@ -58,6 +58,7 @@ public class ApplicationConfiguration
     props.put("bootstrap.servers", properties.getBootstrapServer());
     props.put("group.id", properties.getGroupId());
     props.put("client.id", properties.getClientId());
+    props.put("enable.auto.commit", false);
     props.put("auto.offset.reset", properties.getAutoOffsetReset());
     props.put("metadata.max.age.ms", "1000");
     props.put("key.deserializer", StringDeserializer.class.getName());
index 66fea35..39e7b1b 100644 (file)
@@ -134,7 +134,6 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
     catch(WakeupException e)
     {
       log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
-      consumer.commitSync();
       shutdown();
     }
     catch(RecordDeserializationException e)
@@ -148,7 +147,6 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
           offset,
           e.getCause().toString());
 
-      consumer.commitSync();
       shutdown(e);
     }
     catch(Exception e)