TMP
[demos/kafka/training] / src / main / java / de / juplo / kafka / DriverController.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import org.springframework.http.HttpStatus;
5 import org.springframework.http.ResponseEntity;
6 import org.springframework.web.bind.annotation.*;
7
8 import java.util.Map;
9 import java.util.concurrent.ExecutionException;
10
11
12 @RestController
13 @RequiredArgsConstructor
14 public class DriverController
15 {
16   private final EndlessConsumer consumer;
17   private final SumRecordHandler wordcount;
18
19
20   @PostMapping("start")
21   public void start()
22   {
23     consumer.start();
24   }
25
26   @PostMapping("stop")
27   public void stop() throws ExecutionException, InterruptedException
28   {
29     consumer.stop();
30   }
31
32
33   @GetMapping("seen")
34   public Map<Integer, Map<String, Map<String, Long>>> seen()
35   {
36     return wordcount.getSeen();
37   }
38
39   @GetMapping("seen/{user}")
40   public ResponseEntity<Map<String, Long>> seen(@PathVariable String user)
41   {
42     for (Map<String, Map<String, Long>> users : wordcount.getSeen().values())
43     {
44       Map<String, Long> words = users.get(user);
45       if (words != null)
46         return ResponseEntity.ok(words);
47     }
48
49     return ResponseEntity.notFound().build();
50   }
51
52
53   @ExceptionHandler
54   @ResponseStatus(HttpStatus.BAD_REQUEST)
55   public ErrorResponse illegalStateException(IllegalStateException e)
56   {
57     return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
58   }
59 }