From a580ec85212fbe4a014d5c2d5c8839809f5935f5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 9 Apr 2022 21:37:46 +0200 Subject: [PATCH] Endopint zum abfragen der Offsets der zugeordneten Partitionen --- src/main/java/de/juplo/kafka/DriverController.java | 9 +++++++++ src/main/java/de/juplo/kafka/EndlessConsumer.java | 5 +++++ 2 files changed, 14 insertions(+) diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index 93e2856..7847495 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -1,11 +1,14 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; +import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -34,4 +37,10 @@ public class DriverController { return consumer.getSeen(); } + + @GetMapping("offset/{partition}") + public ResponseEntity offset(@PathVariable("partition") Integer partition) + { + return ResponseEntity.of(consumer.getOffset(partition)); + } } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 803c76e..35e836c 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -179,6 +179,11 @@ public class EndlessConsumer implements Runnable future = executor.submit(this); } + public Optional getOffset(Integer partition) + { + return Optional.ofNullable(offsets.get(partition)); + } + public synchronized void stop() throws ExecutionException, InterruptedException { boolean stateChanged = running.compareAndSet(true, false); -- 2.20.1