Implementierung des Versands der Header ergänzt
[demos/kafka/training] / src / main / java / de / juplo / kafka / RestProducer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.producer.Producer;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.springframework.http.HttpStatus;
9 import org.springframework.web.bind.annotation.*;
10 import org.springframework.web.context.request.async.DeferredResult;
11
12 import java.math.BigInteger;
13
14
15 @Slf4j
16 @RequestMapping
17 @ResponseBody
18 @RequiredArgsConstructor
19 public class RestProducer
20 {
21   private final String id;
22   private final String topic;
23   private final Integer partition;
24   private final Producer<String, Object> producer;
25
26   private long produced = 0;
27
28   @PostMapping(path = "{key}")
29   public DeferredResult<ProduceResult> send(
30       @PathVariable String key,
31       @RequestHeader(name = "X-id", required = false) Long correlationId,
32       @RequestBody Integer number)
33   {
34     ResultRecorder result = new ResultRecorder(number+1);
35
36     for (int i = 1; i <= number; i++)
37     {
38       send(key, new AddNumberMessage(number, i), correlationId, result);
39     }
40     send(key, new CalculateSumMessage(number), correlationId, result);
41
42     return result.getDeferredResult();
43   }
44
45   private void send(
46       String key,
47       Object value,
48       Long correlationId,
49       ResultRecorder result)
50   {
51     final long time = System.currentTimeMillis();
52
53     final ProducerRecord<String, Object> record = new ProducerRecord<>(
54         topic,  // Topic
55         partition, // Partition
56         key,    // Key
57         value   // Value
58     );
59
60     record.headers().add("source", id.getBytes());
61     if (correlationId != null)
62     {
63       record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray());
64     }
65
66     producer.send(record, (metadata, e) ->
67     {
68       long now = System.currentTimeMillis();
69       if (e == null)
70       {
71         // HANDLE SUCCESS
72         result.addSuccess(metadata);
73         produced++;
74         log.debug(
75             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
76             id,
77             record.key(),
78             record.value(),
79             metadata.partition(),
80             metadata.offset(),
81             metadata.timestamp(),
82             now - time
83         );
84       }
85       else
86       {
87         // HANDLE ERROR
88         result.addFailure(e);
89         log.error(
90             "{} - ERROR key={} timestamp={} latency={}ms: {}",
91             id,
92             record.key(),
93             metadata == null ? -1 : metadata.timestamp(),
94             now - time,
95             e.toString()
96         );
97       }
98     });
99
100     long now = System.currentTimeMillis();
101     log.trace(
102         "{} - Queued message with key={} latency={}ms",
103         id,
104         record.key(),
105         now - time
106     );
107   }
108
109   @ExceptionHandler
110   @ResponseStatus(HttpStatus.BAD_REQUEST)
111   public ErrorResponse illegalStateException(IllegalStateException e)
112   {
113     return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
114   }
115 }