Moved postage of messages into a reusable standalone implementation
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / OutboxProducer.java
diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java
new file mode 100644 (file)
index 0000000..c08cae7
--- /dev/null
@@ -0,0 +1,105 @@
+package de.juplo.kafka.outbox.delivery;
+
+import com.google.common.primitives.Longs;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+
+
+@Component
+public class OutboxProducer
+{
+  final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
+
+
+  private final OutboxRepository repository;
+  private final KafkaProducer<String, String> producer;
+  private final String topic;
+
+  private long sequenceNumber = 0l;
+
+  public OutboxProducer(
+      ApplicationProperties properties,
+      OutboxRepository repository)
+  {
+    this.repository = repository;
+
+    Properties props = new Properties();
+    props.put("bootstrap.servers", properties.bootstrapServers);
+    props.put("key.serializer", StringSerializer.class.getName());
+    props.put("value.serializer", StringSerializer.class.getName());
+
+    this.producer = new KafkaProducer<>(props);
+    this.topic = properties.topic;
+  }
+
+  @Scheduled(fixedDelay = 500)
+  public void poll()
+  {
+    List<OutboxItem> items;
+    do
+    {
+      items = repository.fetch(sequenceNumber);
+      LOG.debug("Polled {} new items", items.size());
+      for (OutboxItem item : items)
+        send(item);
+    }
+    while (items.size() > 0);
+  }
+
+  void send(OutboxItem item)
+  {
+    final ProducerRecord<String, String> record =
+        new ProducerRecord<>(topic, item.getKey(), item.getValue());
+
+    sequenceNumber = item.getSequenceNumber();
+    record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber));
+
+    producer.send(record, (metadata, e) ->
+    {
+      if (metadata != null)
+      {
+        int deleted = repository.delete(item.getSequenceNumber());
+        LOG.info(
+            "{}/{}:{} - {}:{}={} - deleted: {}",
+            metadata.topic(),
+            metadata.partition(),
+            metadata.offset(),
+            item.getSequenceNumber(),
+            record.key(),
+            record.value(),
+            deleted);
+      }
+      else
+      {
+        // HANDLE ERROR
+        LOG.error(
+            "{}/{} - {}:{}={} -> ",
+            record.topic(),
+            record.partition(),
+            item.getSequenceNumber(),
+            record.key(),
+            record.value(),
+            e);
+      }
+    });
+  }
+
+
+  @PreDestroy
+  public void close()
+  {
+    producer.close(Duration.ofSeconds(5));
+  }
+}