From 6ebaf2f448316e0240fa0314a8a958174ae18462 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 10 Nov 2024 12:10:10 +0100 Subject: [PATCH] Version des ``spring-consumer``, die zu vorgegebenen Offsets springt MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Die übergebenen Offsets werden bei der Zuteilung der Partitionen verarbeitet. * Anschließend werden die übergebenen Start-Offsets gelöscht, so dass bei späteren Rebalances kein erneuter Wechsel der Offset-Position erfolgt. * Die Anwendung überprüft nicht, ob die Menge der übergebenen Offsets vollständig ist, oder zu den zugeteilten Partitionen passt. * D.h. insbesondere, dass ein Wechsel der Offset-Positionen für eine bestimmte Partition ggf. erst später erfolgt, wenn diese Partition einer anderen Instanz zugeteilt ist und diese Instanz die Partition erst später frei gibt. * Docker-Setup und `README.sh` zur Vorführung angepasst --- README.sh | 15 +++++---- docker/docker-compose.yml | 10 +++++- pom.xml | 2 +- .../juplo/kafka/ApplicationConfiguration.java | 22 +++++++++++++ .../de/juplo/kafka/ApplicationProperties.java | 1 + .../java/de/juplo/kafka/ExampleConsumer.java | 32 +++++++++++++++++-- 6 files changed, 70 insertions(+), 12 deletions(-) diff --git a/README.sh b/README.sh index b46e235..3ec6309 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-seek-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -10,7 +10,7 @@ then fi docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf consumer +docker compose -f docker/docker-compose.yml rm -svf consumer seek if [[ $(docker image ls -q $IMAGE) == "" || @@ -29,11 +29,12 @@ docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 docker compose -f docker/docker-compose.yml up -d producer docker compose -f docker/docker-compose.yml up -d consumer +sleep 5 +docker compose -f docker/docker-compose.yml stop producer +docker compose -f docker/docker-compose.yml up -d seek sleep 5 docker compose -f docker/docker-compose.yml stop consumer - -docker compose -f docker/docker-compose.yml start consumer sleep 5 - -docker compose -f docker/docker-compose.yml stop producer consumer -docker compose -f docker/docker-compose.yml logs consumer +docker compose -f docker/docker-compose.yml restart seek +sleep 10 +docker compose -f docker/docker-compose.yml logs seek diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 6bd2766..84c2d50 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -199,12 +199,20 @@ services: juplo.producer.throttle-ms: 100 consumer: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/spring-consumer:1.1-seek-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: consumer juplo.consumer.topic: test + seek: + image: juplo/spring-consumer:1.1-seek-SNAPSHOT + environment: + juplo.bootstrap-server: kafka:9092 + juplo.client-id: seek + juplo.consumer.topic: test + juplo.consumer.offsets: 0=5,1=7 + volumes: zookeeper-data: zookeeper-log: diff --git a/pom.xml b/pom.xml index 98a0a36..405a349 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-SNAPSHOT + 1.1-seek-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index a4856a6..f8547a9 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -3,13 +3,18 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.StickyAssignor; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +import java.util.stream.Collectors; @Configuration @@ -22,10 +27,27 @@ public class ApplicationConfiguration ApplicationProperties properties, ConfigurableApplicationContext applicationContext) { + Map offsets; + + if (properties.getConsumerProperties().getOffsets() == null) + { + offsets = new HashMap<>(); + } + else + { + offsets = Arrays + .stream(properties.getConsumerProperties().getOffsets()) + .map(partition -> partition.split("=")) + .collect(Collectors.toMap( + parts -> new TopicPartition(properties.getConsumer().getTopic(), Integer.parseInt(parts[0])), + parts -> Long.parseLong(parts[1]))); + } + return new ExampleConsumer( properties.getClientId(), properties.getConsumerProperties().getTopic(), + offsets, kafkaConsumer, () -> applicationContext.close()); } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index c8193c9..ae97d75 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -44,6 +44,7 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; + private String[] offsets; private OffsetReset autoOffsetReset; private Duration autoCommitInterval; diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index f832b45..8cb3698 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -2,19 +2,22 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; -import java.util.Arrays; +import java.util.*; @Slf4j -public class ExampleConsumer implements Runnable +public class ExampleConsumer implements Runnable, ConsumerRebalanceListener { private final String id; private final String topic; + private final Map offsets; private final Consumer consumer; private final Thread workerThread; private final Runnable closeCallback; @@ -26,11 +29,13 @@ public class ExampleConsumer implements Runnable public ExampleConsumer( String clientId, String topic, + Map offsets, Consumer consumer, Runnable closeCallback) { this.id = clientId; this.topic = topic; + this.offsets = offsets; this.consumer = consumer; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); @@ -46,7 +51,7 @@ public class ExampleConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + consumer.subscribe(Arrays.asList(topic), this); running = true; while (running) @@ -97,6 +102,27 @@ public class ExampleConsumer implements Runnable } + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(partition -> + { + Long offset = offsets.get(partition); + if (offset != null) + { + log.info("{} - Seeking to offset {} for partition {}", id, offset, partition); + consumer.seek(partition, offset); + offsets.remove(partition); + } + }); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + } + + public void shutdown() throws InterruptedException { log.info("{} joining the worker-thread...", id); -- 2.20.1