Versendete Nachrichten an den neuen Kontrakt angepasst
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.clients.producer.Producer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8
9
10 @RequiredArgsConstructor
11 @Slf4j
12 public class ApplicationRecordHandler implements RecordHandler<String, Integer>
13 {
14   private final Producer<String, String> producer;
15   private final String id;
16   private final String topic;
17
18
19   @Override
20   public void accept(ConsumerRecord<String, Integer> record)
21   {
22     String key = record.key();
23     int number = record.value();
24
25     for (int i = 1; i <= number; i++)
26     {
27       send(key, Integer.toString(i));
28     }
29     send(key, "CALCULATE");
30   }
31
32   private void send(String key, String value)
33   {
34       final long time = System.currentTimeMillis();
35
36       final ProducerRecord<String, String> record = new ProducerRecord<>(
37           topic,  // Topic
38           key,    // Key
39           value   // Value
40       );
41
42       producer.send(record, (metadata, e) ->
43       {
44         long now = System.currentTimeMillis();
45         if (e == null)
46         {
47           // HANDLE SUCCESS
48           log.debug(
49               "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
50               id,
51               record.key(),
52               record.value(),
53               metadata.partition(),
54               metadata.offset(),
55               metadata.timestamp(),
56               now - time
57           );
58         }
59         else
60         {
61           // HANDLE ERROR
62           log.error(
63               "{} - ERROR key={} timestamp={} latency={}ms: {}",
64               id,
65               record.key(),
66               metadata == null ? -1 : metadata.timestamp(),
67               now - time,
68               e.toString()
69           );
70         }
71       });
72
73       long now = System.currentTimeMillis();
74       log.trace(
75           "{} - Queued message with key={} latency={}ms",
76           id,
77           record.key(),
78           now - time
79       );
80   }
81 }