Merge branch 'endless-stream-consumer' into rebalance-listener
authorKai Moritz <kai@juplo.de>
Sun, 10 Apr 2022 20:15:34 +0000 (22:15 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 10 Apr 2022 20:15:34 +0000 (22:15 +0200)
1  2 
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java

Simple merge
diff --cc pom.xml
Simple merge
index 0000000,0000000..ab9782c
new file mode 100644 (file)
--- /dev/null
--- /dev/null
@@@ -1,0 -1,0 +1,32 @@@
++package de.juplo.kafka;
++
++import lombok.RequiredArgsConstructor;
++import org.springframework.boot.actuate.health.Health;
++import org.springframework.boot.actuate.health.HealthIndicator;
++import org.springframework.stereotype.Component;
++
++
++@Component
++@RequiredArgsConstructor
++public class ApplicationHealthIndicator implements HealthIndicator
++{
++  private final EndlessConsumer consumer;
++
++
++  @Override
++  public Health health()
++  {
++    try
++    {
++      return consumer
++          .exitStatus()
++          .map(Health::down)
++          .orElse(Health.outOfService())
++          .build();
++    }
++    catch (IllegalStateException e)
++    {
++      return Health.up().build();
++    }
++  }
++}
@@@ -1,11 -1,12 +1,14 @@@
  package de.juplo.kafka;
  
  import lombok.RequiredArgsConstructor;
+ import org.springframework.http.HttpStatus;
+ import org.springframework.web.bind.annotation.ExceptionHandler;
 +import org.springframework.web.bind.annotation.GetMapping;
  import org.springframework.web.bind.annotation.PostMapping;
+ import org.springframework.web.bind.annotation.ResponseStatus;
  import org.springframework.web.bind.annotation.RestController;
  
 +import java.util.Map;
  import java.util.concurrent.ExecutionException;
  
  
@@@ -28,10 -29,10 +31,16 @@@ public class DriverControlle
      consumer.stop();
    }
  
 +  @GetMapping("seen")
 +  public Map<Integer, Map<String, Integer>> seen()
 +  {
 +    return consumer.getSeen();
 +  }
++
+   @ExceptionHandler
+   @ResponseStatus(HttpStatus.BAD_REQUEST)
+   public ErrorResponse illegalStateException(IllegalStateException e)
+   {
+     return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
+   }
  }