WIP
authorKai Moritz <kai@juplo.de>
Tue, 18 May 2021 22:22:42 +0000 (00:22 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 18 May 2021 22:22:42 +0000 (00:22 +0200)
12 files changed:
.dockerignore [new file with mode: 0644]
.gitignore [new file with mode: 0644]
Dockerfile [new file with mode: 0644]
README.sh [new file with mode: 0755]
docker-compose.yml [new file with mode: 0644]
pom.xml [new file with mode: 0644]
src/main/java/de/juplo/kafka/seek/Application.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/seek/ApplicationProperties.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/seek/Consumer.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/seek/SeekController.java [new file with mode: 0644]
src/main/resources/application.yml [new file with mode: 0644]
src/test/java/de/juplo/kafka/seek/ApplicationTests.java [new file with mode: 0644]

diff --git a/.dockerignore b/.dockerignore
new file mode 100644 (file)
index 0000000..1ad9963
--- /dev/null
@@ -0,0 +1,2 @@
+*
+!target/*.jar
diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..612c5bc
--- /dev/null
@@ -0,0 +1,3 @@
+target
+.idea
+*.iml
diff --git a/Dockerfile b/Dockerfile
new file mode 100644 (file)
index 0000000..16ee25e
--- /dev/null
@@ -0,0 +1,5 @@
+FROM openjdk:11-jre
+VOLUME /tmp
+COPY target/*.jar /opt/app.jar
+ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ]
+CMD []
diff --git a/README.sh b/README.sh
new file mode 100755 (executable)
index 0000000..e01b9c5
--- /dev/null
+++ b/README.sh
@@ -0,0 +1,29 @@
+#!/bin/bash
+
+if [ "$1" = "cleanup" ]
+then
+  docker-compose down -v
+  exit
+fi
+
+docker-compose up -d zookeeper kafka
+
+while ! [[ $(docker-compose exec kafka zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]];
+do
+  echo "Waiting for kafka...";
+  sleep 1;
+done
+
+docker-compose exec kafka kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --replication-factor 1 --partitions 1 --topic foo
+
+docker-compose up -d producer consumer
+
+sleep 3
+docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group bar --reset-offsets --to-earliest 
+sleep 3
+docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group bar --reset-offsets --to-earliest 
+sleep 3
+docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group bar --reset-offsets --to-earliest 
+
+docker-compose stop producer consumer
+docker-compose logs consumer
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644 (file)
index 0000000..c6562e1
--- /dev/null
@@ -0,0 +1,113 @@
+version: "3"
+
+services:
+
+  zookeeper:
+    image: confluentinc/cp-zookeeper:6.0.1
+    ports:
+      - 2181:2181
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+
+  kafka:
+    image: confluentinc/cp-kafka:6.0.1
+    ports:
+      - 9092:9092
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
+      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
+      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+    depends_on:
+      - zookeeper
+
+  producer:
+    image: confluentinc/cp-kafkacat:6.0.1
+    command:
+      bash -c '
+      export A=0;
+      while true;
+      do
+        export A=$$(($$A + 1));
+        echo -n $$A;
+        echo $$A | kafkacat -b kafka:9093 -t foo -k $$A%7;
+      done'
+    tty: true
+
+  peter:
+    image: juplo/seek:1.0-SNAPSHOT
+    ports:
+      - 8001:8001
+    environment:
+      server.port: 8001
+      seek.bootstrap-server: kafka:9093
+      seek.group-id: seek
+      seek.client-id: peter 
+      seek.topic: test
+
+  franz:
+    image: juplo/seek:1.0-SNAPSHOT
+    ports:
+      - 8002:8002
+    environment:
+      server.port: 8002
+      seek.bootstrap-server: kafka:9093
+      seek.group-id: seek
+      seek.client-id: franz
+      seek.topic: test
+
+  beate:
+    image: juplo/seek:1.0-SNAPSHOT
+    ports:
+      - 8003:8003
+    environment:
+      server.port: 8003
+      seek.bootstrap-server: kafka:9093
+      seek.group-id: seek
+      seek.client-id: beate
+      seek.topic: test
+
+  ute:
+    image: juplo/seek:1.0-SNAPSHOT
+    ports:
+      - 8004:8004
+    environment:
+      server.port: 8004
+      seek.bootstrap-server: kafka:9093
+      seek.group-id: seek
+      seek.client-id: ute
+      seek.topic: test
+
+  klaus:
+    image: juplo/seek:1.0-SNAPSHOT
+    ports:
+      - 8005:8005
+    environment:
+      server.port: 8005
+      seek.bootstrap-server: kafka:9093
+      seek.group-id: seek
+      seek.client-id: klaus
+      seek.topic: test
+
+  paul:
+    image: juplo/seek:1.0-SNAPSHOT
+    ports:
+      - 8006:8006
+    environment:
+      server.port: 8006
+      seek.bootstrap-server: kafka:9093
+      seek.group-id: seek
+      seek.client-id: paul
+      seek.topic: test
+
+  siggi:
+    image: juplo/seek:1.0-SNAPSHOT
+    ports:
+      - 8007:8007
+    environment:
+      server.port: 8007
+      seek.bootstrap-server: kafka:9093
+      seek.group-id: seek
+      seek.client-id: siggi 
+      seek.topic: test
diff --git a/pom.xml b/pom.xml
new file mode 100644 (file)
index 0000000..922d525
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.springframework.boot</groupId>
+    <artifactId>spring-boot-starter-parent</artifactId>
+    <version>2.1.5.RELEASE</version>
+    <relativePath/> <!-- lookup parent from repository -->
+  </parent>
+
+  <groupId>de.juplo.kafka</groupId>
+  <artifactId>seek</artifactId>
+  <version>1.0-SNAPSHOT</version>
+  <name>Seek Example</name>
+  <description>Can I Seek All Partitions Of A Running Consumer-Group From A Single Instance</description>
+
+  <properties>
+    <java.version>11</java.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-web</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-actuator</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-json</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-configuration-processor</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.projectlombok</groupId>
+      <artifactId>lombok</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>io.fabric8</groupId>
+        <artifactId>docker-maven-plugin</artifactId>
+        <version>0.33.0</version>
+        <configuration>
+          <images>
+            <image>
+              <name>juplo/%a:%v</name>
+            </image>
+          </images>
+        </configuration>
+        <executions>
+          <execution>
+             <id>build</id>
+             <phase>package</phase>
+             <goals>
+               <goal>build</goal>
+             </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/src/main/java/de/juplo/kafka/seek/Application.java b/src/main/java/de/juplo/kafka/seek/Application.java
new file mode 100644 (file)
index 0000000..cddf354
--- /dev/null
@@ -0,0 +1,43 @@
+package de.juplo.kafka.seek;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.util.Assert;
+
+import java.util.concurrent.Executors;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(ApplicationProperties.class)
+public class Application
+{
+
+  @Autowired
+  ApplicationProperties properties;
+
+
+  @Bean
+  public Consumer consumer()
+  {
+    Assert.hasText(properties.getBootstrapServer(), "seek.bootstrap-server must be set");
+    Assert.hasText(properties.getGroupId(), "seek.group-id must be set");
+    Assert.hasText(properties.getClientId(), "seek.client-id must be set");
+    Assert.hasText(properties.getTopic(), "seek.topic must be set");
+
+    return
+        new Consumer(
+            Executors.newFixedThreadPool(1),
+            properties.getBootstrapServer(),
+            properties.getGroupId(),
+            properties.getClientId(),
+            properties.getTopic());
+  }
+
+  public static void main(String[] args)
+  {
+    SpringApplication.run(Application.class, args);
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/seek/ApplicationProperties.java b/src/main/java/de/juplo/kafka/seek/ApplicationProperties.java
new file mode 100644 (file)
index 0000000..d098a03
--- /dev/null
@@ -0,0 +1,16 @@
+package de.juplo.kafka.seek;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties(prefix = "seek")
+@Getter
+@Setter
+public class ApplicationProperties
+{
+  private String bootstrapServer;
+  private String groupId;
+  private String clientId;
+  private String topic;
+}
diff --git a/src/main/java/de/juplo/kafka/seek/Consumer.java b/src/main/java/de/juplo/kafka/seek/Consumer.java
new file mode 100644 (file)
index 0000000..0c232d6
--- /dev/null
@@ -0,0 +1,119 @@
+package de.juplo.kafka.seek;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import javax.annotation.PreDestroy;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+
+@Slf4j
+public class Consumer implements Runnable
+{
+  private final ExecutorService executor;
+  private final String id;
+  private final String topic;
+  private final KafkaConsumer<Long, String> consumer;
+
+  private boolean running = false;
+  Future<?> future = null;
+
+
+  public Consumer(
+      ExecutorService executor,
+      String bootstrapServer,
+      String groupId,
+      String clientId,
+      String topic)
+  {
+    this.executor = executor;
+
+    this.id = clientId;
+    this.topic = topic;
+
+    Properties props = new Properties();
+    props.put("bootstrap.servers", bootstrapServer);
+    props.put("group.id", groupId);
+    props.put("client.id", clientId);
+    props.put("key.deserializer", LongDeserializer.class.getName());
+    props.put("value.deserializer", StringDeserializer.class.getName());
+
+    consumer = new KafkaConsumer<>(props);
+  }
+
+
+  @Override
+  public void run()
+  {
+    log.info("{} - Subscribing to topic test", id);
+    consumer.subscribe(Arrays.asList(topic));
+
+    try
+    {
+
+      running = true;
+
+      while (running)
+      {
+        ConsumerRecords<Long, String> records = consumer.poll(Duration.ofSeconds(1));
+        for (ConsumerRecord<Long, String> record : records)
+          log.info(
+              "{} - {}: {}/{} - {}",
+              id,
+              record.offset(),
+              record.topic(),
+              record.partition(),
+              record.value()
+          );
+      }
+    }
+    catch(WakeupException e)
+    {
+      log.info("{} - RIIING!", id);
+    }
+    finally
+    {
+      log.info("{} - Unsubscribing...", id);
+      consumer.unsubscribe();
+      running = false;
+    }
+  }
+
+  public synchronized void start()
+  {
+    if (running)
+      throw new RuntimeException("Consumier instance " + id + " is already running!");
+
+    log.info("Running {}", id);
+    future = executor.submit(this);
+  }
+
+  public synchronized void stop() throws ExecutionException, InterruptedException
+  {
+    if (!running)
+      throw new RuntimeException("Consumier instance " + id + " is not running!");
+
+    log.info("Stopping {}", id);
+    running = false;
+    consumer.wakeup();
+    future.get();
+  }
+
+  @PreDestroy
+  public void destroy() throws ExecutionException, InterruptedException
+  {
+    stop();
+    log.info("{} - Closing the KafkaConsumer", id);
+    consumer.close(Duration.ofSeconds(3));
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/seek/SeekController.java b/src/main/java/de/juplo/kafka/seek/SeekController.java
new file mode 100644 (file)
index 0000000..9a96c4a
--- /dev/null
@@ -0,0 +1,28 @@
+package de.juplo.kafka.seek;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.concurrent.ExecutionException;
+
+
+@RestController
+@RequiredArgsConstructor
+public class SeekController
+{
+  private final Consumer consumer;
+
+
+  @PostMapping("start")
+  public void start()
+  {
+    consumer.start();
+  }
+
+  @PostMapping("stop")
+  public void stop() throws ExecutionException, InterruptedException
+  {
+    consumer.stop();
+  }
+}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
new file mode 100644 (file)
index 0000000..7ea719a
--- /dev/null
@@ -0,0 +1,5 @@
+management:
+  endpoints:
+    web:
+      exposure:
+        include: "*"
diff --git a/src/test/java/de/juplo/kafka/seek/ApplicationTests.java b/src/test/java/de/juplo/kafka/seek/ApplicationTests.java
new file mode 100644 (file)
index 0000000..c466977
--- /dev/null
@@ -0,0 +1,17 @@
+package de.juplo.kafka.seek;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(properties = {
+    "seek.bootstrap-server=:9092",
+    "seek.topic=test",
+    "seek.id=peter" })
+public class ApplicationTests
+{
+  @Test
+  public void contextLoads() {}
+}