From 09bde51b91eaee9b51587a2a0b813bfc240d82c5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 11 Jun 2021 10:38:07 +0200 Subject: [PATCH] WIP --- README.sh | 27 ++++++++++++++----- .../java/de/juplo/kafka/seek/Consumer.java | 22 ++++++++++----- 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/README.sh b/README.sh index e01b9c5..f36d5b0 100755 --- a/README.sh +++ b/README.sh @@ -3,27 +3,40 @@ if [ "$1" = "cleanup" ] then docker-compose down -v + mvn clean + docker image rm juplo/seek:1.0-SNAPSHOT exit fi docker-compose up -d zookeeper kafka +if [[ + $(docker image ls -q juplo/seek:1.0-SNAPSHOT) == "" || + "$1" = "build" +]] +then + mvn install || exit +else + echo "Using image existing images:" + docker image ls juplo/seek:1.0-SNAPSHOT +fi + while ! [[ $(docker-compose exec kafka zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done -docker-compose exec kafka kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --replication-factor 1 --partitions 1 --topic foo - -docker-compose up -d producer consumer +docker-compose exec kafka kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --replication-factor 1 --partitions 7 --topic test +docker-compose up -d producer +docker-compose up -d perter franz beate ute klaus paul sigi sleep 3 -docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group bar --reset-offsets --to-earliest +docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group seek --reset-offsets --to-earliest sleep 3 -docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group bar --reset-offsets --to-earliest +docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group seek --reset-offsets --to-earliest sleep 3 -docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group bar --reset-offsets --to-earliest +docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group seek --reset-offsets --to-earliest -docker-compose stop producer consumer +docker-compose stop producer franz beate ute klaus paul sigi docker-compose logs consumer diff --git a/src/main/java/de/juplo/kafka/seek/Consumer.java b/src/main/java/de/juplo/kafka/seek/Consumer.java index 1fb4cdd..7376945 100644 --- a/src/main/java/de/juplo/kafka/seek/Consumer.java +++ b/src/main/java/de/juplo/kafka/seek/Consumer.java @@ -26,6 +26,7 @@ public class Consumer implements Runnable private final KafkaConsumer consumer; private boolean running = false; + Long offset = null; Future future = null; @@ -45,6 +46,7 @@ public class Consumer implements Runnable props.put("bootstrap.servers", bootstrapServer); props.put("group.id", groupId); props.put("client.id", clientId); + props.put("commit.interval.ms", 500); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); @@ -65,6 +67,18 @@ public class Consumer implements Runnable while (running) { + if (offset != null) + { + log.info("{} - seeking to offset {}", id, offset); + consumer + .partitionsFor(topic) + .forEach(partition -> + consumer.seek( + new TopicPartition(topic, partition.partition()), + offset)); + offset = null; + } + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord record : records) log.info( @@ -90,18 +104,14 @@ public class Consumer implements Runnable log.info("{} - Unsubscribing...", id); consumer.unsubscribe(); running = false; + offset = null; } } public void seek(long offset) { - consumer - .partitionsFor(topic) - .forEach(partition -> - consumer.seek( - new TopicPartition(topic, partition.partition()), - offset)); + this.offset = offset; } -- 2.20.1