Verbesserungen & Korrekturen aus rest-producer übernommen
[demos/kafka/training] / src / main / java / de / juplo / kafka / RestProducer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.springframework.http.HttpStatus;
8 import org.springframework.web.bind.annotation.*;
9 import org.springframework.web.context.request.async.DeferredResult;
10
11 import java.math.BigInteger;
12
13
14 @Slf4j
15 @RequestMapping
16 @ResponseBody
17 @RequiredArgsConstructor
18 public class RestProducer
19 {
20   private final String id;
21   private final String topic;
22   private final Integer partition;
23   private final KafkaProducer<String, Object> producer;
24
25   private long produced = 0;
26
27   @PostMapping(path = "{key}")
28   public DeferredResult<ProduceResult> send(
29       @PathVariable String key,
30       @RequestHeader(name = "X-id", required = false) Long correlationId,
31       @RequestBody Integer number)
32   {
33     ResultRecorder result = new ResultRecorder(number+1);
34
35     for (int i = 1; i <= number; i++)
36     {
37       send(key, new AddNumberMessage(number, i), correlationId, result);
38     }
39     send(key, new CalculateSumMessage(number), correlationId, result);
40
41     return result.getDeferredResult();
42   }
43
44   private void send(
45       String key,
46       Object value,
47       Long correlationId,
48       ResultRecorder result)
49   {
50     final long time = System.currentTimeMillis();
51
52     final ProducerRecord<String, Object> record = new ProducerRecord<>(
53         topic,  // Topic
54         partition, // Partition
55         key,    // Key
56         value   // Value
57     );
58
59     record.headers().add("source", id.getBytes());
60     if (correlationId != null)
61     {
62       record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray());
63     }
64
65     producer.send(record, (metadata, e) ->
66     {
67       long now = System.currentTimeMillis();
68       if (e == null)
69       {
70         // HANDLE SUCCESS
71         result.addSuccess(metadata);
72         produced++;
73         log.debug(
74             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
75             id,
76             record.key(),
77             record.value(),
78             metadata.partition(),
79             metadata.offset(),
80             metadata.timestamp(),
81             now - time
82         );
83       }
84       else
85       {
86         // HANDLE ERROR
87         result.addFailure(e);
88         log.error(
89             "{} - ERROR key={} timestamp={} latency={}ms: {}",
90             id,
91             record.key(),
92             metadata == null ? -1 : metadata.timestamp(),
93             now - time,
94             e.toString()
95         );
96       }
97     });
98
99     long now = System.currentTimeMillis();
100     log.trace(
101         "{} - Queued message with key={} latency={}ms",
102         id,
103         record.key(),
104         now - time
105     );
106   }
107
108   @ExceptionHandler
109   @ResponseStatus(HttpStatus.BAD_REQUEST)
110   public ErrorResponse illegalStateException(IllegalStateException e)
111   {
112     return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
113   }
114 }