From b215b362ab60d22ba3a47714a65d75427769669a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 10 Nov 2024 14:24:27 +0100 Subject: [PATCH] =?utf8?q?Vorlage=20f=C3=BCr=20die=20Seek-=C3=9Cbung?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 40 ------------------- .../java/de/juplo/kafka/ExampleConsumer.java | 25 +++++++----- 2 files changed, 15 insertions(+), 50 deletions(-) delete mode 100755 README.sh diff --git a/README.sh b/README.sh deleted file mode 100755 index 3ec6309..0000000 --- a/README.sh +++ /dev/null @@ -1,40 +0,0 @@ -#!/bin/bash - -IMAGE=juplo/spring-consumer:1.1-seek-SNAPSHOT - -if [ "$1" = "cleanup" ] -then - docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans - mvn clean - exit -fi - -docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf consumer seek - -if [[ - $(docker image ls -q $IMAGE) == "" || - "$1" = "build" -]] -then - mvn clean install || exit -else - echo "Using image existing images:" - docker image ls $IMAGE -fi - -docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 - - -docker compose -f docker/docker-compose.yml up -d producer -docker compose -f docker/docker-compose.yml up -d consumer - -sleep 5 -docker compose -f docker/docker-compose.yml stop producer -docker compose -f docker/docker-compose.yml up -d seek -sleep 5 -docker compose -f docker/docker-compose.yml stop consumer -sleep 5 -docker compose -f docker/docker-compose.yml restart seek -sleep 10 -docker compose -f docker/docker-compose.yml logs seek diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 8cb3698..b71c4f2 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -54,6 +54,21 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener consumer.subscribe(Arrays.asList(topic), this); running = true; + log.info("{} - Fetching PartitionInfo for topic {}", id, topic); + consumer + .partitionsFor(topic) + .forEach(info -> + { + TopicPartition partition = new TopicPartition(topic, info.partition()); + Long offset = offsets.get(partition); + if (offset != null) + { + log.info("{} - Seeking to offset {} for partition {}", id, offset, partition); + consumer.seek(partition, offset); + offsets.remove(partition); + } + }); + while (running) { ConsumerRecords records = @@ -105,16 +120,6 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener @Override public void onPartitionsAssigned(Collection partitions) { - partitions.forEach(partition -> - { - Long offset = offsets.get(partition); - if (offset != null) - { - log.info("{} - Seeking to offset {} for partition {}", id, offset, partition); - consumer.seek(partition, offset); - offsets.remove(partition); - } - }); } @Override -- 2.20.1