fdae76f979c98aa6016af81b1b0c12ba0666c405
[demos/kafka/training] / src / main / java / de / juplo / kafka / DriverController.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import org.springframework.http.HttpStatus;
5 import org.springframework.http.ResponseEntity;
6 import org.springframework.web.bind.annotation.*;
7
8 import java.util.List;
9 import java.util.Map;
10 import java.util.concurrent.ExecutionException;
11
12
13 @RestController
14 @RequiredArgsConstructor
15 public class DriverController
16 {
17   private final EndlessConsumer consumer;
18   private final SumRecordHandler sumRecordHandler;
19
20
21   @PostMapping("start")
22   public void start()
23   {
24     consumer.start();
25   }
26
27   @PostMapping("stop")
28   public void stop() throws ExecutionException, InterruptedException
29   {
30     consumer.stop();
31   }
32
33
34   @GetMapping("seen")
35   public Map<Integer, Map<String, List<Long>>> seen()
36   {
37     return sumRecordHandler.getSeen();
38   }
39
40   @GetMapping("seen/{user}")
41   public ResponseEntity<List<Long>> seen(@PathVariable String user)
42   {
43     for (Map<String, List<Long>> users : sumRecordHandler.getSeen().values())
44     {
45       List<Long> results = users.get(user);
46       if (results != null)
47         return ResponseEntity.ok(results);
48     }
49
50     return ResponseEntity.notFound().build();
51   }
52
53
54   @ExceptionHandler
55   @ResponseStatus(HttpStatus.BAD_REQUEST)
56   public ErrorResponse illegalStateException(IllegalStateException e)
57   {
58     return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
59   }
60 }