* Außerdem den Service `spickzettel` zum Auslesen des Offset-Topics ergänzt.
#!/bin/bash
-IMAGE=juplo/spring-consumer:1.1-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-assign-SNAPSHOT
if [ "$1" = "cleanup" ]
then
stop_grace_period: 0s
depends_on:
- cli
+ - spickzettel
zoonavigator:
image: elkozmon/zoonavigator:1.1.2
juplo.producer.throttle-ms: 100
consumer:
- image: juplo/spring-consumer:1.1-SNAPSHOT
+ image: juplo/spring-consumer:1.1-assign-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
juplo.client-id: consumer
- juplo.consumer.topic: test
+ juplo.consumer.partitions: test:0,test:1
+
+ spickzettel:
+ image: juplo/toolbox
+ command: >
+ bash -c '
+ kafka-console-consumer \
+ --bootstrap-server kafka:9092 \
+ --topic __consumer_offsets --from-beginning \
+ --formatter "kafka.coordinator.group.GroupMetadataManager\$$OffsetsMessageFormatter"
+ '
volumes:
zookeeper-data:
<artifactId>spring-consumer</artifactId>
<name>Spring Consumer</name>
<description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
- <version>1.1-SNAPSHOT</version>
+ <version>1.1-assign-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.StickyAssignor;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import java.util.Arrays;
import java.util.Properties;
return
new ExampleConsumer(
properties.getClientId(),
- properties.getConsumerProperties().getTopic(),
+ Arrays
+ .stream(properties.getConsumerProperties().getPartitions())
+ .map(partition ->
+ {
+ String[] parts = partition.split(":");
+ return new TopicPartition(parts[0], Integer.parseInt(parts[1]));
+ })
+ .toList(),
kafkaConsumer,
() -> applicationContext.close());
}
private String groupId;
@NotNull
@NotEmpty
- private String topic;
+ private String[] partitions;
private OffsetReset autoOffsetReset;
private Duration autoCommitInterval;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
-import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
@Slf4j
public class ExampleConsumer implements Runnable
{
private final String id;
- private final String topic;
+ private final List<TopicPartition> partitions;
private final Consumer<String, String> consumer;
private final Thread workerThread;
private final Runnable closeCallback;
public ExampleConsumer(
String clientId,
- String topic,
+ List<TopicPartition> partitions,
Consumer<String, String> consumer,
Runnable closeCallback)
{
this.id = clientId;
- this.topic = topic;
+ this.partitions = partitions;
this.consumer = consumer;
workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
{
try
{
- log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic));
+ log.info(
+ "{} - Assigning to partitions: {}",
+ id,
+ partitions
+ .stream()
+ .map(TopicPartition::toString)
+ .collect(Collectors.joining(", ")));
+ consumer.assign(partitions);
running = true;
while (running)
client-id: DEV
consumer:
group-id: my-group
- topic: test
+ partitions: test:0,test:1
auto-offset-reset: earliest
auto-commit-interval: 5s
management:
client-id: ${juplo.client-id}
consumer:
group-id: ${juplo.consumer.group-id}
- topic: ${juplo.consumer.topic}
+ partitions: ${juplo.consumer.partitions}
auto-offset-reset: ${juplo.consumer.auto-offset-reset}
auto-commit-interval: ${juplo.consumer.auto-commit-interval}
logging:
properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.auto-offset-reset=earliest",
- "juplo.consumer.topic=" + TOPIC })
+ "juplo.consumer.partitions=" + TOPIC + ":0" })
@AutoConfigureMockMvc
@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
public class ApplicationTests