Vorlage
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.clients.producer.Producer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8
9
10 @RequiredArgsConstructor
11 @Slf4j
12 public class ApplicationRecordHandler implements RecordHandler<String, Integer>
13 {
14   private final Producer<String, Object> producer;
15   private final String id;
16   private final String topic;
17
18
19   @Override
20   public void accept(ConsumerRecord<String, Integer> record)
21   {
22     String key = record.key();
23     int number = record.value();
24
25     // TODO: JSON-Nachrichten verschicken
26   }
27
28   private void send(String key, Object value)
29   {
30       final long time = System.currentTimeMillis();
31
32       final ProducerRecord<String, Object> record = new ProducerRecord<>(
33           topic,  // Topic
34           key,    // Key
35           value   // Value
36       );
37
38       producer.send(record, (metadata, e) ->
39       {
40         long now = System.currentTimeMillis();
41         if (e == null)
42         {
43           // HANDLE SUCCESS
44           log.debug(
45               "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
46               id,
47               record.key(),
48               record.value(),
49               metadata.partition(),
50               metadata.offset(),
51               metadata.timestamp(),
52               now - time
53           );
54         }
55         else
56         {
57           // HANDLE ERROR
58           log.error(
59               "{} - ERROR key={} timestamp={} latency={}ms: {}",
60               id,
61               record.key(),
62               metadata == null ? -1 : metadata.timestamp(),
63               now - time,
64               e.toString()
65           );
66         }
67       });
68
69       long now = System.currentTimeMillis();
70       log.trace(
71           "{} - Queued message with key={} latency={}ms",
72           id,
73           record.key(),
74           now - time
75       );
76   }
77 }