* Beim Merge der Änderungen aus 'deserialization' wurde versehentlich
die Konfigurations-Option `enable.auto.commit=false` entfernt.
* Dadurch hat die Kafka-Clientlibary parallel weitre Commits nach Kafka
geschrieben, die eigentlich nicht mehr benötigt werden, weil die
Commits ja zusammen mit den Statistiken in der MongoDB gespeichert werden.
* Bei der Korrektur ist außerdem aufgefallen, dass in dem Branch
'deserialization' einige `Consumer.commitSync()`-Aufrufe übersehen wurden,
die durch das explizite Speichern der Offsets nun auch nicht mehr benötigt
werden.
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("group.id", properties.getGroupId());
props.put("client.id", properties.getClientId());
+ props.put("enable.auto.commit", false);
props.put("auto.offset.reset", properties.getAutoOffsetReset());
props.put("metadata.max.age.ms", "1000");
props.put("key.deserializer", StringDeserializer.class.getName());
catch(WakeupException e)
{
log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
- consumer.commitSync();
shutdown();
}
catch(RecordDeserializationException e)
offset,
e.getCause().toString());
- consumer.commitSync();
shutdown(e);
}
catch(Exception e)