Version von SumUp-Requests, die einen fachlichen Fehler erzeugt
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.clients.producer.Producer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8
9 import java.util.Optional;
10
11
12 @RequiredArgsConstructor
13 @Slf4j
14 public class ApplicationRecordHandler implements RecordHandler<String, Integer>
15 {
16   private final Producer<String, Object> producer;
17   private final Optional<Integer> errorPosition;
18   private final String id;
19   private final String topic;
20
21   private int counter = 0;
22
23
24   @Override
25   public void accept(ConsumerRecord<String, Integer> record)
26   {
27     String key = record.key();
28     int number = record.value();
29
30     for (int i = 1; i <= number; i++)
31     {
32       if (errorPosition.isPresent() && ++counter == errorPosition.get())
33       {
34         log.info("{} - Erzeuge fachlichen Fehler!");
35         send(key, new AddNumberMessage(number, counter * -1));
36       }
37       send(key, new AddNumberMessage(number, i));
38     }
39     send(key, new CalculateSumMessage(number));
40   }
41
42   private void send(String key, Object value)
43   {
44       final long time = System.currentTimeMillis();
45
46       final ProducerRecord<String, Object> record = new ProducerRecord<>(
47           topic,  // Topic
48           key,    // Key
49           value   // Value
50       );
51
52       producer.send(record, (metadata, e) ->
53       {
54         long now = System.currentTimeMillis();
55         if (e == null)
56         {
57           // HANDLE SUCCESS
58           log.debug(
59               "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
60               id,
61               record.key(),
62               record.value(),
63               metadata.partition(),
64               metadata.offset(),
65               metadata.timestamp(),
66               now - time
67           );
68         }
69         else
70         {
71           // HANDLE ERROR
72           log.error(
73               "{} - ERROR key={} timestamp={} latency={}ms: {}",
74               id,
75               record.key(),
76               metadata == null ? -1 : metadata.timestamp(),
77               now - time,
78               e.toString()
79           );
80         }
81       });
82
83       long now = System.currentTimeMillis();
84       log.trace(
85           "{} - Queued message with key={} latency={}ms",
86           id,
87           record.key(),
88           now - time
89       );
90   }
91 }