WIP
[demos/kafka/training] / src / main / java / de / juplo / kafka / DriverController.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import org.springframework.http.HttpStatus;
5 import org.springframework.http.ResponseEntity;
6 import org.springframework.web.bind.annotation.*;
7
8 import java.util.List;
9 import java.util.Map;
10 import java.util.Optional;
11 import java.util.concurrent.ExecutionException;
12 import java.util.stream.Collectors;
13
14
15 @RestController
16 @RequiredArgsConstructor
17 public class DriverController
18 {
19   private final EndlessConsumer consumer;
20   private final SumRecordHandler sumRecordHandler;
21
22
23   @PostMapping("start")
24   public void start()
25   {
26     consumer.start();
27   }
28
29   @PostMapping("stop")
30   public void stop() throws ExecutionException, InterruptedException
31   {
32     consumer.stop();
33   }
34
35
36   @GetMapping("state")
37   public Map<Integer, Map<String, Long>> state()
38   {
39     return
40         sumRecordHandler
41             .getState()
42             .entrySet()
43             .stream()
44             .collect(Collectors.toMap(
45                 entry -> entry.getKey(),
46                 entry -> entry.getValue().getState()));
47   }
48
49   @GetMapping("state/{user}")
50   public ResponseEntity<Long> seen(@PathVariable String user)
51   {
52     for (SumBusinessLogic sumBusinessLogic : sumRecordHandler.getState().values())
53     {
54       Optional<Long> sum = sumBusinessLogic.getSum(user);
55       if (sum.isPresent())
56         return ResponseEntity.ok(sum.get());
57     }
58
59     return ResponseEntity.notFound().build();
60   }
61
62
63   @ExceptionHandler
64   @ResponseStatus(HttpStatus.BAD_REQUEST)
65   public ErrorResponse illegalStateException(IllegalStateException e)
66   {
67     return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
68   }
69 }