X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSumUpRecordHandler.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSumUpRecordHandler.java;h=0000000000000000000000000000000000000000;hb=77cbd2b55e26b39be90a722761ebb6e59e8f965a;hp=5d15b3b0611bc795b11d7d613ab19f929665ff36;hpb=41e5f74b40e4a434483dcc4142aaf8224ea5a478;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/SumUpRecordHandler.java b/src/main/java/de/juplo/kafka/SumUpRecordHandler.java deleted file mode 100644 index 5d15b3b..0000000 --- a/src/main/java/de/juplo/kafka/SumUpRecordHandler.java +++ /dev/null @@ -1,82 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; - - -@RequiredArgsConstructor -@Slf4j -public class SumUpRecordHandler implements RecordHandler -{ - private final Producer producer; - private final String id; - private final String topic; - - - @Override - public void accept(ConsumerRecord record) - { - String key = record.key(); - int number = record.value(); - - send(key, "START"); - for (int i = 1; i <= number; i++) - { - send(key, Integer.toString(i)); - } - send(key, "END"); - } - - private void send(String key, String value) - { - final long time = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - log.debug( - "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", - id, - record.key(), - record.value(), - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - log.error( - "{} - ERROR key={} timestamp={} latency={}ms: {}", - id, - record.key(), - metadata == null ? -1 : metadata.timestamp(), - now - time, - e.toString() - ); - } - }); - - long now = System.currentTimeMillis(); - log.trace( - "{} - Queued message with key={} latency={}ms", - id, - record.key(), - now - time - ); - } -}