Benennung vereinheitlicht und projektunabhängig gemacht
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
new file mode 100644 (file)
index 0000000..a4b6a8f
--- /dev/null
@@ -0,0 +1,82 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ApplicationRecordHandler implements RecordHandler<String, Integer>
+{
+  private final Producer<String, String> producer;
+  private final String id;
+  private final String topic;
+
+
+  @Override
+  public void accept(ConsumerRecord<String, Integer> record)
+  {
+    String key = record.key();
+    int number = record.value();
+
+    send(key, "START");
+    for (int i = 1; i <= number; i++)
+    {
+      send(key, Integer.toString(i));
+    }
+    send(key, "END");
+  }
+
+  private void send(String key, String value)
+  {
+      final long time = System.currentTimeMillis();
+
+      final ProducerRecord<String, String> record = new ProducerRecord<>(
+          topic,  // Topic
+          key,    // Key
+          value   // Value
+      );
+
+      producer.send(record, (metadata, e) ->
+      {
+        long now = System.currentTimeMillis();
+        if (e == null)
+        {
+          // HANDLE SUCCESS
+          log.debug(
+              "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
+              id,
+              record.key(),
+              record.value(),
+              metadata.partition(),
+              metadata.offset(),
+              metadata.timestamp(),
+              now - time
+          );
+        }
+        else
+        {
+          // HANDLE ERROR
+          log.error(
+              "{} - ERROR key={} timestamp={} latency={}ms: {}",
+              id,
+              record.key(),
+              metadata == null ? -1 : metadata.timestamp(),
+              now - time,
+              e.toString()
+          );
+        }
+      });
+
+      long now = System.currentTimeMillis();
+      log.trace(
+          "{} - Queued message with key={} latency={}ms",
+          id,
+          record.key(),
+          now - time
+      );
+  }
+}