Benennung vereinheitlicht und projektunabhängig gemacht
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.clients.producer.Producer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8
9
10 @RequiredArgsConstructor
11 @Slf4j
12 public class ApplicationRecordHandler implements RecordHandler<String, Integer>
13 {
14   private final Producer<String, String> producer;
15   private final String id;
16   private final String topic;
17
18
19   @Override
20   public void accept(ConsumerRecord<String, Integer> record)
21   {
22     String key = record.key();
23     int number = record.value();
24
25     send(key, "START");
26     for (int i = 1; i <= number; i++)
27     {
28       send(key, Integer.toString(i));
29     }
30     send(key, "END");
31   }
32
33   private void send(String key, String value)
34   {
35       final long time = System.currentTimeMillis();
36
37       final ProducerRecord<String, String> record = new ProducerRecord<>(
38           topic,  // Topic
39           key,    // Key
40           value   // Value
41       );
42
43       producer.send(record, (metadata, e) ->
44       {
45         long now = System.currentTimeMillis();
46         if (e == null)
47         {
48           // HANDLE SUCCESS
49           log.debug(
50               "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
51               id,
52               record.key(),
53               record.value(),
54               metadata.partition(),
55               metadata.offset(),
56               metadata.timestamp(),
57               now - time
58           );
59         }
60         else
61         {
62           // HANDLE ERROR
63           log.error(
64               "{} - ERROR key={} timestamp={} latency={}ms: {}",
65               id,
66               record.key(),
67               metadata == null ? -1 : metadata.timestamp(),
68               now - time,
69               e.toString()
70           );
71         }
72       });
73
74       long now = System.currentTimeMillis();
75       log.trace(
76           "{} - Queued message with key={} latency={}ms",
77           id,
78           record.key(),
79           now - time
80       );
81   }
82 }