WIP
authorKai Moritz <kai@juplo.de>
Tue, 18 May 2021 22:44:59 +0000 (00:44 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 18 May 2021 22:44:59 +0000 (00:44 +0200)
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/seek/Consumer.java
src/main/java/de/juplo/kafka/seek/SeekController.java
src/test/java/de/juplo/kafka/seek/ApplicationTests.java

index c6562e1..383c4dd 100644 (file)
@@ -31,7 +31,8 @@ services:
       do
         export A=$$(($$A + 1));
         echo -n $$A;
-        echo $$A | kafkacat -b kafka:9093 -t foo -k $$A%7;
+        echo $$A | kafkacat -b kafka:9093 -t test -k $$A%7;
+        sleep 1;
       done'
     tty: true
 
diff --git a/pom.xml b/pom.xml
index 922d525..f18136b 100644 (file)
--- a/pom.xml
+++ b/pom.xml
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-actuator</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-json</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-configuration-processor</artifactId>
index 0c232d6..1fb4cdd 100644 (file)
@@ -4,8 +4,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 import javax.annotation.PreDestroy;
@@ -45,7 +45,7 @@ public class Consumer implements Runnable
     props.put("bootstrap.servers", bootstrapServer);
     props.put("group.id", groupId);
     props.put("client.id", clientId);
-    props.put("key.deserializer", LongDeserializer.class.getName());
+    props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", StringDeserializer.class.getName());
 
     consumer = new KafkaConsumer<>(props);
@@ -81,6 +81,10 @@ public class Consumer implements Runnable
     {
       log.info("{} - RIIING!", id);
     }
+    catch(Exception e)
+    {
+      log.error("{} - Unexpected error: {}", id, e.toString());
+    }
     finally
     {
       log.info("{} - Unsubscribing...", id);
@@ -89,6 +93,18 @@ public class Consumer implements Runnable
     }
   }
 
+
+  public void seek(long offset)
+  {
+    consumer
+        .partitionsFor(topic)
+        .forEach(partition ->
+            consumer.seek(
+                new TopicPartition(topic, partition.partition()),
+                offset));
+  }
+
+
   public synchronized void start()
   {
     if (running)
@@ -109,6 +125,7 @@ public class Consumer implements Runnable
     future.get();
   }
 
+
   @PreDestroy
   public void destroy() throws ExecutionException, InterruptedException
   {
index 9a96c4a..24cb6c1 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.seek;
 
 import lombok.RequiredArgsConstructor;
 import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RestController;
 
 import java.util.concurrent.ExecutionException;
@@ -25,4 +26,11 @@ public class SeekController
   {
     consumer.stop();
   }
+
+
+  @PostMapping("seek")
+  public void seek(@RequestBody long offset)
+  {
+    consumer.seek(offset);
+  }
 }
index c466977..5e1ca66 100644 (file)
@@ -8,8 +8,9 @@ import org.springframework.test.context.junit4.SpringRunner;
 @RunWith(SpringRunner.class)
 @SpringBootTest(properties = {
     "seek.bootstrap-server=:9092",
-    "seek.topic=test",
-    "seek.id=peter" })
+    "seek.group-id=test",
+    "seek.client-id=peter",
+    "seek.topic=test" })
 public class ApplicationTests
 {
   @Test