--- /dev/null
--- /dev/null
++package de.juplo.kafka;
++
++import lombok.RequiredArgsConstructor;
++import org.springframework.boot.actuate.health.Health;
++import org.springframework.boot.actuate.health.HealthIndicator;
++import org.springframework.stereotype.Component;
++
++
++@Component
++@RequiredArgsConstructor
++public class ApplicationHealthIndicator implements HealthIndicator
++{
++ private final EndlessConsumer consumer;
++
++
++ @Override
++ public Health health()
++ {
++ try
++ {
++ return consumer
++ .exitStatus()
++ .map(Health::down)
++ .orElse(Health.outOfService())
++ .build();
++ }
++ catch (IllegalStateException e)
++ {
++ return Health.up().build();
++ }
++ }
++}
package de.juplo.kafka;
import lombok.RequiredArgsConstructor;
+ import org.springframework.http.HttpStatus;
+ import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
+ import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
consumer.stop();
}
-
+ @GetMapping("seen")
+ public Map<Integer, Map<String, Integer>> seen()
+ {
+ return consumer.getSeen();
+ }
++
+ @ExceptionHandler
+ @ResponseStatus(HttpStatus.BAD_REQUEST)
+ public ErrorResponse illegalStateException(IllegalStateException e)
+ {
+ return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
+ }
}