X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationConfiguration.java;h=54e9b893f45ffca7303b58bf63b8d69348f65fca;hb=e53004f3133b737699e995a3b18fff28203a0e8c;hp=4054e930ff991167e8d015b80eb40021f692e1b6;hpb=28b729e55e2e0914b06c1dcb0a5defd9bcef4933;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 4054e93..54e9b89 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -32,11 +32,13 @@ public class ApplicationConfiguration KafkaConsumer kafkaConsumer, ExecutorService executor, Consumer> handler, + PartitionStatisticsRepository repository, ApplicationProperties properties) { return new EndlessConsumer<>( executor, + repository, properties.getClientId(), properties.getTopic(), kafkaConsumer, @@ -57,6 +59,7 @@ public class ApplicationConfiguration props.put("bootstrap.servers", properties.getBootstrapServer()); props.put("group.id", properties.getGroupId()); props.put("client.id", properties.getClientId()); + props.put("enable.auto.commit", false); props.put("auto.offset.reset", properties.getAutoOffsetReset()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName());