From 2022198b31ce426538388105dc26114ad393739b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 19 May 2021 00:44:59 +0200 Subject: [PATCH] WIP --- docker-compose.yml | 3 ++- pom.xml | 4 ---- .../java/de/juplo/kafka/seek/Consumer.java | 21 +++++++++++++++++-- .../de/juplo/kafka/seek/SeekController.java | 8 +++++++ .../de/juplo/kafka/seek/ApplicationTests.java | 5 +++-- 5 files changed, 32 insertions(+), 9 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index c6562e1..383c4dd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,7 +31,8 @@ services: do export A=$$(($$A + 1)); echo -n $$A; - echo $$A | kafkacat -b kafka:9093 -t foo -k $$A%7; + echo $$A | kafkacat -b kafka:9093 -t test -k $$A%7; + sleep 1; done' tty: true diff --git a/pom.xml b/pom.xml index 922d525..f18136b 100644 --- a/pom.xml +++ b/pom.xml @@ -30,10 +30,6 @@ org.springframework.boot spring-boot-starter-actuator - - org.springframework.boot - spring-boot-starter-json - org.springframework.boot spring-boot-configuration-processor diff --git a/src/main/java/de/juplo/kafka/seek/Consumer.java b/src/main/java/de/juplo/kafka/seek/Consumer.java index 0c232d6..1fb4cdd 100644 --- a/src/main/java/de/juplo/kafka/seek/Consumer.java +++ b/src/main/java/de/juplo/kafka/seek/Consumer.java @@ -4,8 +4,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import javax.annotation.PreDestroy; @@ -45,7 +45,7 @@ public class Consumer implements Runnable props.put("bootstrap.servers", bootstrapServer); props.put("group.id", groupId); props.put("client.id", clientId); - props.put("key.deserializer", LongDeserializer.class.getName()); + props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); consumer = new KafkaConsumer<>(props); @@ -81,6 +81,10 @@ public class Consumer implements Runnable { log.info("{} - RIIING!", id); } + catch(Exception e) + { + log.error("{} - Unexpected error: {}", id, e.toString()); + } finally { log.info("{} - Unsubscribing...", id); @@ -89,6 +93,18 @@ public class Consumer implements Runnable } } + + public void seek(long offset) + { + consumer + .partitionsFor(topic) + .forEach(partition -> + consumer.seek( + new TopicPartition(topic, partition.partition()), + offset)); + } + + public synchronized void start() { if (running) @@ -109,6 +125,7 @@ public class Consumer implements Runnable future.get(); } + @PreDestroy public void destroy() throws ExecutionException, InterruptedException { diff --git a/src/main/java/de/juplo/kafka/seek/SeekController.java b/src/main/java/de/juplo/kafka/seek/SeekController.java index 9a96c4a..24cb6c1 100644 --- a/src/main/java/de/juplo/kafka/seek/SeekController.java +++ b/src/main/java/de/juplo/kafka/seek/SeekController.java @@ -2,6 +2,7 @@ package de.juplo.kafka.seek; import lombok.RequiredArgsConstructor; import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.ExecutionException; @@ -25,4 +26,11 @@ public class SeekController { consumer.stop(); } + + + @PostMapping("seek") + public void seek(@RequestBody long offset) + { + consumer.seek(offset); + } } diff --git a/src/test/java/de/juplo/kafka/seek/ApplicationTests.java b/src/test/java/de/juplo/kafka/seek/ApplicationTests.java index c466977..5e1ca66 100644 --- a/src/test/java/de/juplo/kafka/seek/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/seek/ApplicationTests.java @@ -8,8 +8,9 @@ import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest(properties = { "seek.bootstrap-server=:9092", - "seek.topic=test", - "seek.id=peter" }) + "seek.group-id=test", + "seek.client-id=peter", + "seek.topic=test" }) public class ApplicationTests { @Test -- 2.20.1