Service ergänzt, der das Dead-Letter-Topic ausliest
[demos/kafka/training] / src / main / java / de / juplo / kafka / DriverController.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import org.springframework.http.HttpStatus;
5 import org.springframework.http.ResponseEntity;
6 import org.springframework.web.bind.annotation.*;
7
8 import java.util.List;
9 import java.util.Map;
10 import java.util.Optional;
11 import java.util.concurrent.ExecutionException;
12 import java.util.stream.Collectors;
13
14
15 @RestController
16 @RequiredArgsConstructor
17 public class DriverController
18 {
19   private final EndlessConsumer consumer;
20   private final ApplicationRecordHandler recordHandler;
21   private final AdderResults results;
22
23
24   @PostMapping("start")
25   public void start()
26   {
27     consumer.start();
28   }
29
30   @PostMapping("stop")
31   public void stop() throws ExecutionException, InterruptedException
32   {
33     consumer.stop();
34   }
35
36
37   @GetMapping("state")
38   public Map<Integer, Map<String, AdderResult>> state()
39   {
40     return
41         recordHandler
42             .getState()
43             .entrySet()
44             .stream()
45             .collect(Collectors.toMap(
46                 entry -> entry.getKey(),
47                 entry -> entry.getValue().getState()));
48   }
49
50   @GetMapping("state/{user}")
51   public ResponseEntity<Long> state(@PathVariable String user)
52   {
53     for (AdderBusinessLogic adder : recordHandler.getState().values())
54     {
55       Optional<Long> sum = adder.getSum(user);
56       if (sum.isPresent())
57         return ResponseEntity.ok(sum.get());
58     }
59
60     return ResponseEntity.notFound().build();
61   }
62
63   @GetMapping("results")
64   public Map<Integer, Map<String, List<AdderResult>>> results()
65   {
66     return results.getState();
67   }
68
69   @GetMapping("results/{user}")
70   public ResponseEntity<List<AdderResult>> results(@PathVariable String user)
71   {
72     for (Map<String, List<AdderResult>> resultsByUser : this.results.getState().values())
73     {
74       List<AdderResult> results = resultsByUser.get(user);
75       if (results != null)
76         return ResponseEntity.ok(results);
77     }
78
79     return ResponseEntity.notFound().build();
80   }
81
82
83   @ExceptionHandler
84   @ResponseStatus(HttpStatus.BAD_REQUEST)
85   public ErrorResponse illegalStateException(IllegalStateException e)
86   {
87     return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
88   }
89 }