From b97a263cc7df3ebd1574b843713d21ea5b593e9e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 9 Nov 2024 18:36:27 +0100 Subject: [PATCH] Version des `spring-consumer`, die mit `assign()` arbeitet MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Außerdem den Service `spickzettel` zum Auslesen des Offset-Topics ergänzt. --- README.sh | 2 +- build.gradle | 2 +- docker/docker-compose.yml | 15 ++++++++++++-- pom.xml | 2 +- .../juplo/kafka/ApplicationConfiguration.java | 11 +++++++++- .../de/juplo/kafka/ApplicationProperties.java | 2 +- .../java/de/juplo/kafka/ExampleConsumer.java | 20 +++++++++++++------ src/main/resources/application.yml | 4 ++-- .../java/de/juplo/kafka/ApplicationTests.java | 2 +- 9 files changed, 44 insertions(+), 16 deletions(-) diff --git a/README.sh b/README.sh index b46e2350..c504e109 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-assign-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index a8614fdf..68a37dfa 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.1-SNAPSHOT' +version = '1.1-assign-SNAPSHOT' java { toolchain { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 4fa2eade..26ef4b32 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -113,6 +113,7 @@ services: stop_grace_period: 0s depends_on: - cli + - spickzettel akhq: image: tchiotludo/akhq:0.23.0 @@ -145,11 +146,21 @@ services: juplo.producer.throttle-ms: 100 consumer: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/spring-consumer:1.1-assign-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: consumer - juplo.consumer.topic: test + juplo.consumer.partitions: test:0,test:1 + + spickzettel: + image: juplo/toolbox + command: > + bash -c ' + kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic __consumer_offsets --from-beginning \ + --formatter "kafka.coordinator.group.GroupMetadataManager\$$OffsetsMessageFormatter" + ' peter: image: juplo/spring-consumer:1.1-SNAPSHOT diff --git a/pom.xml b/pom.xml index dd96d00f..8898423c 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-SNAPSHOT + 1.1-assign-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index a4856a64..f309ae15 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -3,12 +3,14 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.StickyAssignor; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.Arrays; import java.util.Properties; @@ -25,7 +27,14 @@ public class ApplicationConfiguration return new ExampleConsumer( properties.getClientId(), - properties.getConsumerProperties().getTopic(), + Arrays + .stream(properties.getConsumerProperties().getPartitions()) + .map(partition -> + { + String[] parts = partition.split(":"); + return new TopicPartition(parts[0], Integer.parseInt(parts[1])); + }) + .toList(), kafkaConsumer, () -> applicationContext.close()); } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index c8193c9f..9e6f5e3c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -43,7 +43,7 @@ public class ApplicationProperties private String groupId; @NotNull @NotEmpty - private String topic; + private String[] partitions; private OffsetReset autoOffsetReset; private Duration autoCommitInterval; diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 79382ef9..ec0c2541 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -4,17 +4,19 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; -import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; @Slf4j public class ExampleConsumer implements Runnable { private final String id; - private final String topic; + private final List partitions; private final Consumer consumer; private final Thread workerThread; private final Runnable closeCallback; @@ -25,12 +27,12 @@ public class ExampleConsumer implements Runnable public ExampleConsumer( String clientId, - String topic, + List partitions, Consumer consumer, Runnable closeCallback) { this.id = clientId; - this.topic = topic; + this.partitions = partitions; this.consumer = consumer; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); @@ -45,8 +47,14 @@ public class ExampleConsumer implements Runnable { try { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + log.info( + "{} - Assigning to partitions: {}", + id, + partitions + .stream() + .map(TopicPartition::toString) + .collect(Collectors.joining(", "))); + consumer.assign(partitions); running = true; while (running) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7a06731c..c5f21ec4 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -3,7 +3,7 @@ juplo: client-id: DEV consumer: group-id: my-group - topic: test + partitions: test:0,test:1 auto-offset-reset: earliest auto-commit-interval: 5s management: @@ -25,7 +25,7 @@ info: client-id: ${juplo.client-id} consumer: group-id: ${juplo.consumer.group-id} - topic: ${juplo.consumer.topic} + partitions: ${juplo.consumer.partitions} auto-offset-reset: ${juplo.consumer.auto-offset-reset} auto-commit-interval: ${juplo.consumer.auto-commit-interval} logging: diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 092660b4..5be4ee54 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -20,7 +20,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.consumer.topic=" + TOPIC }) + "juplo.consumer.partitions=" + TOPIC + ":0" }) @AutoConfigureMockMvc @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) public class ApplicationTests -- 2.20.1