From: Kai Moritz Date: Sat, 9 Apr 2022 19:37:46 +0000 (+0200) Subject: Endopint zum abfragen der Offsets der zugeordneten Partitionen X-Git-Tag: offset-endpoint X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a580ec85212fbe4a014d5c2d5c8839809f5935f5;p=demos%2Fkafka%2Ftraining Endopint zum abfragen der Offsets der zugeordneten Partitionen --- diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index 93e2856..7847495 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -1,11 +1,14 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; +import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -34,4 +37,10 @@ public class DriverController { return consumer.getSeen(); } + + @GetMapping("offset/{partition}") + public ResponseEntity offset(@PathVariable("partition") Integer partition) + { + return ResponseEntity.of(consumer.getOffset(partition)); + } } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 803c76e..35e836c 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -179,6 +179,11 @@ public class EndlessConsumer implements Runnable future = executor.submit(this); } + public Optional getOffset(Integer partition) + { + return Optional.ofNullable(offsets.get(partition)); + } + public synchronized void stop() throws ExecutionException, InterruptedException { boolean stateChanged = running.compareAndSet(true, false);