#!/bin/bash
-IMAGE=juplo/spring-consumer:1.1-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-seek-SNAPSHOT
if [ "$1" = "cleanup" ]
then
fi
docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf consumer
+docker compose -f docker/docker-compose.yml rm -svf consumer seek
if [[
$(docker image ls -q $IMAGE) == "" ||
docker compose -f docker/docker-compose.yml up -d producer
docker compose -f docker/docker-compose.yml up -d consumer
+sleep 5
+docker compose -f docker/docker-compose.yml stop producer
+docker compose -f docker/docker-compose.yml up -d seek
sleep 5
docker compose -f docker/docker-compose.yml stop consumer
-
-docker compose -f docker/docker-compose.yml start consumer
sleep 5
-
-docker compose -f docker/docker-compose.yml stop producer consumer
-docker compose -f docker/docker-compose.yml logs consumer
+docker compose -f docker/docker-compose.yml restart seek
+sleep 10
+docker compose -f docker/docker-compose.yml logs seek
juplo.producer.throttle-ms: 100
consumer:
- image: juplo/spring-consumer:1.1-SNAPSHOT
+ image: juplo/spring-consumer:1.1-seek-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
juplo.client-id: consumer
juplo.consumer.topic: test
+ seek:
+ image: juplo/spring-consumer:1.1-seek-SNAPSHOT
+ environment:
+ juplo.bootstrap-server: kafka:9092
+ juplo.client-id: seek
+ juplo.consumer.topic: test
+ juplo.consumer.offsets: 0=5,1=7
+
volumes:
zookeeper-data:
zookeeper-log:
<artifactId>spring-consumer</artifactId>
<name>Spring Consumer</name>
<description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
- <version>1.1-SNAPSHOT</version>
+ <version>1.1-seek-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.StickyAssignor;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
+import java.util.stream.Collectors;
@Configuration
ApplicationProperties properties,
ConfigurableApplicationContext applicationContext)
{
+ Map<TopicPartition, Long> offsets;
+
+ if (properties.getConsumerProperties().getOffsets() == null)
+ {
+ offsets = new HashMap<>();
+ }
+ else
+ {
+ offsets = Arrays
+ .stream(properties.getConsumerProperties().getOffsets())
+ .map(partition -> partition.split("="))
+ .collect(Collectors.toMap(
+ parts -> new TopicPartition(properties.getConsumer().getTopic(), Integer.parseInt(parts[0])),
+ parts -> Long.parseLong(parts[1])));
+ }
+
return
new ExampleConsumer(
properties.getClientId(),
properties.getConsumerProperties().getTopic(),
+ offsets,
kafkaConsumer,
() -> applicationContext.close());
}
@NotNull
@NotEmpty
private String topic;
+ private String[] offsets;
private OffsetReset autoOffsetReset;
private Duration autoCommitInterval;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
-import java.util.Arrays;
+import java.util.*;
@Slf4j
-public class ExampleConsumer implements Runnable
+public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
{
private final String id;
private final String topic;
+ private final Map<TopicPartition, Long> offsets;
private final Consumer<String, String> consumer;
private final Thread workerThread;
private final Runnable closeCallback;
public ExampleConsumer(
String clientId,
String topic,
+ Map<TopicPartition, Long> offsets,
Consumer<String, String> consumer,
Runnable closeCallback)
{
this.id = clientId;
this.topic = topic;
+ this.offsets = offsets;
this.consumer = consumer;
workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
try
{
log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic));
+ consumer.subscribe(Arrays.asList(topic), this);
running = true;
while (running)
}
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(partition ->
+ {
+ Long offset = offsets.get(partition);
+ if (offset != null)
+ {
+ log.info("{} - Seeking to offset {} for partition {}", id, offset, partition);
+ consumer.seek(partition, offset);
+ offsets.remove(partition);
+ }
+ });
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ }
+
+
public void shutdown() throws InterruptedException
{
log.info("{} joining the worker-thread...", id);