From: Kai Moritz Date: Sat, 9 Nov 2024 17:36:27 +0000 (+0100) Subject: Version des `spring-consumer`, die mit `assign()` arbeitet X-Git-Tag: consumer/spring-consumer--assign--2024-11-13--si X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fconsumer%2Fspring-consumer--assign;p=demos%2Fkafka%2Ftraining Version des `spring-consumer`, die mit `assign()` arbeitet * Außerdem den Service `spickzettel` zum Auslesen des Offset-Topics ergänzt. --- diff --git a/README.sh b/README.sh index b46e235..c504e10 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-assign-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 6bd2766..c3aa9b7 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -155,6 +155,7 @@ services: stop_grace_period: 0s depends_on: - cli + - spickzettel zoonavigator: image: elkozmon/zoonavigator:1.1.2 @@ -199,11 +200,21 @@ services: juplo.producer.throttle-ms: 100 consumer: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/spring-consumer:1.1-assign-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: consumer - juplo.consumer.topic: test + juplo.consumer.partitions: test:0,test:1 + + spickzettel: + image: juplo/toolbox + command: > + bash -c ' + kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic __consumer_offsets --from-beginning \ + --formatter "kafka.coordinator.group.GroupMetadataManager\$$OffsetsMessageFormatter" + ' volumes: zookeeper-data: diff --git a/pom.xml b/pom.xml index 98a0a36..4d47b36 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-SNAPSHOT + 1.1-assign-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index a4856a6..f309ae1 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -3,12 +3,14 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.StickyAssignor; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.Arrays; import java.util.Properties; @@ -25,7 +27,14 @@ public class ApplicationConfiguration return new ExampleConsumer( properties.getClientId(), - properties.getConsumerProperties().getTopic(), + Arrays + .stream(properties.getConsumerProperties().getPartitions()) + .map(partition -> + { + String[] parts = partition.split(":"); + return new TopicPartition(parts[0], Integer.parseInt(parts[1])); + }) + .toList(), kafkaConsumer, () -> applicationContext.close()); } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index c8193c9..9e6f5e3 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -43,7 +43,7 @@ public class ApplicationProperties private String groupId; @NotNull @NotEmpty - private String topic; + private String[] partitions; private OffsetReset autoOffsetReset; private Duration autoCommitInterval; diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index f832b45..1f1986b 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -4,17 +4,19 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; -import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; @Slf4j public class ExampleConsumer implements Runnable { private final String id; - private final String topic; + private final List partitions; private final Consumer consumer; private final Thread workerThread; private final Runnable closeCallback; @@ -25,12 +27,12 @@ public class ExampleConsumer implements Runnable public ExampleConsumer( String clientId, - String topic, + List partitions, Consumer consumer, Runnable closeCallback) { this.id = clientId; - this.topic = topic; + this.partitions = partitions; this.consumer = consumer; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); @@ -45,8 +47,14 @@ public class ExampleConsumer implements Runnable { try { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + log.info( + "{} - Assigning to partitions: {}", + id, + partitions + .stream() + .map(TopicPartition::toString) + .collect(Collectors.joining(", "))); + consumer.assign(partitions); running = true; while (running) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7a06731..c5f21ec 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -3,7 +3,7 @@ juplo: client-id: DEV consumer: group-id: my-group - topic: test + partitions: test:0,test:1 auto-offset-reset: earliest auto-commit-interval: 5s management: @@ -25,7 +25,7 @@ info: client-id: ${juplo.client-id} consumer: group-id: ${juplo.consumer.group-id} - topic: ${juplo.consumer.topic} + partitions: ${juplo.consumer.partitions} auto-offset-reset: ${juplo.consumer.auto-offset-reset} auto-commit-interval: ${juplo.consumer.auto-commit-interval} logging: diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index e4b97a4..9ad7e94 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -21,7 +21,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "spring.kafka.consumer.auto-offset-reset=earliest", - "juplo.consumer.topic=" + TOPIC }) + "juplo.consumer.partitions=" + TOPIC + ":0" }) @AutoConfigureMockMvc @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) public class ApplicationTests