--- /dev/null
+*
+!target/*.jar
--- /dev/null
+target
+.idea
+*.iml
--- /dev/null
+FROM openjdk:11-jre
+VOLUME /tmp
+COPY target/*.jar /opt/app.jar
+ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ]
+CMD []
--- /dev/null
+#!/bin/bash
+
+if [ "$1" = "cleanup" ]
+then
+ docker-compose down -v
+ exit
+fi
+
+docker-compose up -d zookeeper kafka
+
+while ! [[ $(docker-compose exec kafka zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]];
+do
+ echo "Waiting for kafka...";
+ sleep 1;
+done
+
+docker-compose exec kafka kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --replication-factor 1 --partitions 1 --topic foo
+
+docker-compose up -d producer consumer
+
+sleep 3
+docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group bar --reset-offsets --to-earliest
+sleep 3
+docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group bar --reset-offsets --to-earliest
+sleep 3
+docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group bar --reset-offsets --to-earliest
+
+docker-compose stop producer consumer
+docker-compose logs consumer
--- /dev/null
+version: "3"
+
+services:
+
+ zookeeper:
+ image: confluentinc/cp-zookeeper:6.0.1
+ ports:
+ - 2181:2181
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+
+ kafka:
+ image: confluentinc/cp-kafka:6.0.1
+ ports:
+ - 9092:9092
+ environment:
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
+ KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ depends_on:
+ - zookeeper
+
+ producer:
+ image: confluentinc/cp-kafkacat:6.0.1
+ command:
+ bash -c '
+ export A=0;
+ while true;
+ do
+ export A=$$(($$A + 1));
+ echo -n $$A;
+ echo $$A | kafkacat -b kafka:9093 -t foo -k $$A%7;
+ done'
+ tty: true
+
+ peter:
+ image: juplo/seek:1.0-SNAPSHOT
+ ports:
+ - 8001:8001
+ environment:
+ server.port: 8001
+ seek.bootstrap-server: kafka:9093
+ seek.group-id: seek
+ seek.client-id: peter
+ seek.topic: test
+
+ franz:
+ image: juplo/seek:1.0-SNAPSHOT
+ ports:
+ - 8002:8002
+ environment:
+ server.port: 8002
+ seek.bootstrap-server: kafka:9093
+ seek.group-id: seek
+ seek.client-id: franz
+ seek.topic: test
+
+ beate:
+ image: juplo/seek:1.0-SNAPSHOT
+ ports:
+ - 8003:8003
+ environment:
+ server.port: 8003
+ seek.bootstrap-server: kafka:9093
+ seek.group-id: seek
+ seek.client-id: beate
+ seek.topic: test
+
+ ute:
+ image: juplo/seek:1.0-SNAPSHOT
+ ports:
+ - 8004:8004
+ environment:
+ server.port: 8004
+ seek.bootstrap-server: kafka:9093
+ seek.group-id: seek
+ seek.client-id: ute
+ seek.topic: test
+
+ klaus:
+ image: juplo/seek:1.0-SNAPSHOT
+ ports:
+ - 8005:8005
+ environment:
+ server.port: 8005
+ seek.bootstrap-server: kafka:9093
+ seek.group-id: seek
+ seek.client-id: klaus
+ seek.topic: test
+
+ paul:
+ image: juplo/seek:1.0-SNAPSHOT
+ ports:
+ - 8006:8006
+ environment:
+ server.port: 8006
+ seek.bootstrap-server: kafka:9093
+ seek.group-id: seek
+ seek.client-id: paul
+ seek.topic: test
+
+ siggi:
+ image: juplo/seek:1.0-SNAPSHOT
+ ports:
+ - 8007:8007
+ environment:
+ server.port: 8007
+ seek.bootstrap-server: kafka:9093
+ seek.group-id: seek
+ seek.client-id: siggi
+ seek.topic: test
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>2.1.5.RELEASE</version>
+ <relativePath/> <!-- lookup parent from repository -->
+ </parent>
+
+ <groupId>de.juplo.kafka</groupId>
+ <artifactId>seek</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <name>Seek Example</name>
+ <description>Can I Seek All Partitions Of A Running Consumer-Group From A Single Instance</description>
+
+ <properties>
+ <java.version>11</java.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-json</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-configuration-processor</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>0.33.0</version>
+ <configuration>
+ <images>
+ <image>
+ <name>juplo/%a:%v</name>
+ </image>
+ </images>
+ </configuration>
+ <executions>
+ <execution>
+ <id>build</id>
+ <phase>package</phase>
+ <goals>
+ <goal>build</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
--- /dev/null
+package de.juplo.kafka.seek;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.util.Assert;
+
+import java.util.concurrent.Executors;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(ApplicationProperties.class)
+public class Application
+{
+
+ @Autowired
+ ApplicationProperties properties;
+
+
+ @Bean
+ public Consumer consumer()
+ {
+ Assert.hasText(properties.getBootstrapServer(), "seek.bootstrap-server must be set");
+ Assert.hasText(properties.getGroupId(), "seek.group-id must be set");
+ Assert.hasText(properties.getClientId(), "seek.client-id must be set");
+ Assert.hasText(properties.getTopic(), "seek.topic must be set");
+
+ return
+ new Consumer(
+ Executors.newFixedThreadPool(1),
+ properties.getBootstrapServer(),
+ properties.getGroupId(),
+ properties.getClientId(),
+ properties.getTopic());
+ }
+
+ public static void main(String[] args)
+ {
+ SpringApplication.run(Application.class, args);
+ }
+}
--- /dev/null
+package de.juplo.kafka.seek;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties(prefix = "seek")
+@Getter
+@Setter
+public class ApplicationProperties
+{
+ private String bootstrapServer;
+ private String groupId;
+ private String clientId;
+ private String topic;
+}
--- /dev/null
+package de.juplo.kafka.seek;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import javax.annotation.PreDestroy;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+
+@Slf4j
+public class Consumer implements Runnable
+{
+ private final ExecutorService executor;
+ private final String id;
+ private final String topic;
+ private final KafkaConsumer<Long, String> consumer;
+
+ private boolean running = false;
+ Future<?> future = null;
+
+
+ public Consumer(
+ ExecutorService executor,
+ String bootstrapServer,
+ String groupId,
+ String clientId,
+ String topic)
+ {
+ this.executor = executor;
+
+ this.id = clientId;
+ this.topic = topic;
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", bootstrapServer);
+ props.put("group.id", groupId);
+ props.put("client.id", clientId);
+ props.put("key.deserializer", LongDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ consumer = new KafkaConsumer<>(props);
+ }
+
+
+ @Override
+ public void run()
+ {
+ log.info("{} - Subscribing to topic test", id);
+ consumer.subscribe(Arrays.asList(topic));
+
+ try
+ {
+
+ running = true;
+
+ while (running)
+ {
+ ConsumerRecords<Long, String> records = consumer.poll(Duration.ofSeconds(1));
+ for (ConsumerRecord<Long, String> record : records)
+ log.info(
+ "{} - {}: {}/{} - {}",
+ id,
+ record.offset(),
+ record.topic(),
+ record.partition(),
+ record.value()
+ );
+ }
+ }
+ catch(WakeupException e)
+ {
+ log.info("{} - RIIING!", id);
+ }
+ finally
+ {
+ log.info("{} - Unsubscribing...", id);
+ consumer.unsubscribe();
+ running = false;
+ }
+ }
+
+ public synchronized void start()
+ {
+ if (running)
+ throw new RuntimeException("Consumier instance " + id + " is already running!");
+
+ log.info("Running {}", id);
+ future = executor.submit(this);
+ }
+
+ public synchronized void stop() throws ExecutionException, InterruptedException
+ {
+ if (!running)
+ throw new RuntimeException("Consumier instance " + id + " is not running!");
+
+ log.info("Stopping {}", id);
+ running = false;
+ consumer.wakeup();
+ future.get();
+ }
+
+ @PreDestroy
+ public void destroy() throws ExecutionException, InterruptedException
+ {
+ stop();
+ log.info("{} - Closing the KafkaConsumer", id);
+ consumer.close(Duration.ofSeconds(3));
+ }
+}
--- /dev/null
+package de.juplo.kafka.seek;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.concurrent.ExecutionException;
+
+
+@RestController
+@RequiredArgsConstructor
+public class SeekController
+{
+ private final Consumer consumer;
+
+
+ @PostMapping("start")
+ public void start()
+ {
+ consumer.start();
+ }
+
+ @PostMapping("stop")
+ public void stop() throws ExecutionException, InterruptedException
+ {
+ consumer.stop();
+ }
+}
--- /dev/null
+management:
+ endpoints:
+ web:
+ exposure:
+ include: "*"
--- /dev/null
+package de.juplo.kafka.seek;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(properties = {
+ "seek.bootstrap-server=:9092",
+ "seek.topic=test",
+ "seek.id=peter" })
+public class ApplicationTests
+{
+ @Test
+ public void contextLoads() {}
+}