Benennung vereinheitlicht und projektunabhängig gemacht
[demos/kafka/training] / src / main / java / de / juplo / kafka / SumUpRecordHandler.java
diff --git a/src/main/java/de/juplo/kafka/SumUpRecordHandler.java b/src/main/java/de/juplo/kafka/SumUpRecordHandler.java
deleted file mode 100644 (file)
index 5d15b3b..0000000
+++ /dev/null
@@ -1,82 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class SumUpRecordHandler implements RecordHandler<String, Integer>
-{
-  private final Producer<String, String> producer;
-  private final String id;
-  private final String topic;
-
-
-  @Override
-  public void accept(ConsumerRecord<String, Integer> record)
-  {
-    String key = record.key();
-    int number = record.value();
-
-    send(key, "START");
-    for (int i = 1; i <= number; i++)
-    {
-      send(key, Integer.toString(i));
-    }
-    send(key, "END");
-  }
-
-  private void send(String key, String value)
-  {
-      final long time = System.currentTimeMillis();
-
-      final ProducerRecord<String, String> record = new ProducerRecord<>(
-          topic,  // Topic
-          key,    // Key
-          value   // Value
-      );
-
-      producer.send(record, (metadata, e) ->
-      {
-        long now = System.currentTimeMillis();
-        if (e == null)
-        {
-          // HANDLE SUCCESS
-          log.debug(
-              "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
-              id,
-              record.key(),
-              record.value(),
-              metadata.partition(),
-              metadata.offset(),
-              metadata.timestamp(),
-              now - time
-          );
-        }
-        else
-        {
-          // HANDLE ERROR
-          log.error(
-              "{} - ERROR key={} timestamp={} latency={}ms: {}",
-              id,
-              record.key(),
-              metadata == null ? -1 : metadata.timestamp(),
-              now - time,
-              e.toString()
-          );
-        }
-      });
-
-      long now = System.currentTimeMillis();
-      log.trace(
-          "{} - Queued message with key={} latency={}ms",
-          id,
-          record.key(),
-          now - time
-      );
-  }
-}