X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fseek%2FConsumer.java;h=77e156b2ced1eec1cd85c9c37156a13f86e39fcf;hb=HEAD;hp=1fb4cdd3cda19042da0587c7e49c75cf2bb28665;hpb=2022198b31ce426538388105dc26114ad393739b;p=demos%2Fkafka%2Fseek diff --git a/src/main/java/de/juplo/kafka/seek/Consumer.java b/src/main/java/de/juplo/kafka/seek/Consumer.java index 1fb4cdd..77e156b 100644 --- a/src/main/java/de/juplo/kafka/seek/Consumer.java +++ b/src/main/java/de/juplo/kafka/seek/Consumer.java @@ -26,6 +26,7 @@ public class Consumer implements Runnable private final KafkaConsumer consumer; private boolean running = false; + Long offset = null; Future future = null; @@ -45,6 +46,7 @@ public class Consumer implements Runnable props.put("bootstrap.servers", bootstrapServer); props.put("group.id", groupId); props.put("client.id", clientId); + props.put("commit.interval.ms", 500); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); @@ -55,16 +57,25 @@ public class Consumer implements Runnable @Override public void run() { - log.info("{} - Subscribing to topic test", id); - consumer.subscribe(Arrays.asList(topic)); - try { - - running = true; + log.info("{} - Subscribing to topic test", id); + consumer.subscribe(Arrays.asList(topic)); while (running) { + if (offset != null) + { + log.info("{} - seeking to offset {}", id, offset); + consumer + .partitionsFor(topic) + .forEach(partition -> + consumer.seek( + new TopicPartition(topic, partition.partition()), + offset)); + offset = null; + } + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord record : records) log.info( @@ -88,20 +99,16 @@ public class Consumer implements Runnable finally { log.info("{} - Unsubscribing...", id); - consumer.unsubscribe(); running = false; + consumer.unsubscribe(); + offset = null; } } public void seek(long offset) { - consumer - .partitionsFor(topic) - .forEach(partition -> - consumer.seek( - new TopicPartition(topic, partition.partition()), - offset)); + this.offset = offset; } @@ -111,13 +118,14 @@ public class Consumer implements Runnable throw new RuntimeException("Consumier instance " + id + " is already running!"); log.info("Running {}", id); + running = true; future = executor.submit(this); } public synchronized void stop() throws ExecutionException, InterruptedException { if (!running) - throw new RuntimeException("Consumier instance " + id + " is not running!"); + throw new RuntimeException("Consumer instance " + id + " is not running!"); log.info("Stopping {}", id); running = false; @@ -125,7 +133,6 @@ public class Consumer implements Runnable future.get(); } - @PreDestroy public void destroy() throws ExecutionException, InterruptedException {