X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fseek;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fseek%2FConsumer.java;h=1fb4cdd3cda19042da0587c7e49c75cf2bb28665;hp=0c232d638865263f1b5df4a0d3a717a03bb079e9;hb=2022198b31ce426538388105dc26114ad393739b;hpb=7fa06f9b4d341fb34fec502ec4f18048231a43a7 diff --git a/src/main/java/de/juplo/kafka/seek/Consumer.java b/src/main/java/de/juplo/kafka/seek/Consumer.java index 0c232d6..1fb4cdd 100644 --- a/src/main/java/de/juplo/kafka/seek/Consumer.java +++ b/src/main/java/de/juplo/kafka/seek/Consumer.java @@ -4,8 +4,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import javax.annotation.PreDestroy; @@ -45,7 +45,7 @@ public class Consumer implements Runnable props.put("bootstrap.servers", bootstrapServer); props.put("group.id", groupId); props.put("client.id", clientId); - props.put("key.deserializer", LongDeserializer.class.getName()); + props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); consumer = new KafkaConsumer<>(props); @@ -81,6 +81,10 @@ public class Consumer implements Runnable { log.info("{} - RIIING!", id); } + catch(Exception e) + { + log.error("{} - Unexpected error: {}", id, e.toString()); + } finally { log.info("{} - Unsubscribing...", id); @@ -89,6 +93,18 @@ public class Consumer implements Runnable } } + + public void seek(long offset) + { + consumer + .partitionsFor(topic) + .forEach(partition -> + consumer.seek( + new TopicPartition(topic, partition.partition()), + offset)); + } + + public synchronized void start() { if (running) @@ -109,6 +125,7 @@ public class Consumer implements Runnable future.get(); } + @PreDestroy public void destroy() throws ExecutionException, InterruptedException {