X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessConsumer.java;h=b3dd446d84ddbc7fddd44d516cbd921eff1ece7e;hb=8c734b77dd71f6b35707e8085d59ac5b6c43720d;hp=da2f8f0c29653a2420c1769c0db75cbc6db872c3;hpb=31530b8f210b923f5e010104b8a514a2060e7665;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index da2f8f0..b3dd446 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,6 +25,7 @@ public class EndlessConsumer implements Runnable private final String groupId; private final String id; private final String topic; + private final String autoOffsetReset; private AtomicBoolean running = new AtomicBoolean(); private long consumed = 0; @@ -36,13 +37,15 @@ public class EndlessConsumer implements Runnable String bootstrapServer, String groupId, String clientId, - String topic) + String topic, + String autoOffsetReset) { this.executor = executor; this.bootstrapServer = bootstrapServer; this.groupId = groupId; this.id = clientId; this.topic = topic; + this.autoOffsetReset = autoOffsetReset; } @Override @@ -54,7 +57,7 @@ public class EndlessConsumer implements Runnable props.put("bootstrap.servers", bootstrapServer); props.put("group.id", groupId); props.put("client.id", id); - props.put("auto.offset.reset", "earliest"); + props.put("auto.offset.reset", autoOffsetReset); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName());