package de.juplo.kafka;
import lombok.RequiredArgsConstructor;
+import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
{
return consumer.getSeen();
}
+
+ @GetMapping("offset/{partition}")
+ public ResponseEntity<Long> offset(@PathVariable("partition") Integer partition)
+ {
+ return ResponseEntity.of(consumer.getOffset(partition));
+ }
}
future = executor.submit(this);
}
+ public Optional<Long> getOffset(Integer partition)
+ {
+ return Optional.ofNullable(offsets.get(partition));
+ }
+
public synchronized void stop() throws ExecutionException, InterruptedException
{
boolean stateChanged = running.compareAndSet(true, false);