Old entries are removed from the outbox-table in batches
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / OutboxProducer.java
index 2ad4b7e..29d827a 100644 (file)
@@ -3,7 +3,9 @@ package de.juplo.kafka.outbox.delivery;
 import com.google.common.primitives.Longs;
 import org.apache.kafka.common.serialization.StringSerializer;
 
+import java.time.Clock;
 import java.time.Duration;
+import java.time.LocalTime;
 import java.util.List;
 import java.util.Properties;
 
@@ -12,7 +14,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
 
 import javax.annotation.PreDestroy;
 
@@ -20,7 +21,6 @@ import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CON
 import static org.apache.kafka.clients.producer.ProducerConfig.*;
 
 
-@Component
 public class OutboxProducer
 {
   final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
@@ -30,12 +30,17 @@ public class OutboxProducer
   private final OutboxRepository repository;
   private final KafkaProducer<String, String> producer;
   private final String topic;
+  private final Watermarks watermarks;
+  private final Clock clock;
+  private final Duration cleanupInterval;
 
   private long sequenceNumber = 0l;
+  private LocalTime nextCleanup;
 
   public OutboxProducer(
       ApplicationProperties properties,
-      OutboxRepository repository)
+      OutboxRepository repository,
+      Clock clock)
   {
     this.repository = repository;
 
@@ -47,6 +52,11 @@ public class OutboxProducer
 
     this.producer = new KafkaProducer<>(props);
     this.topic = properties.topic;
+
+    this.watermarks = new Watermarks();
+    this.clock = clock;
+    this.cleanupInterval = properties.cleanupInterval;
+    this.nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
   }
 
   @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
@@ -59,6 +69,15 @@ public class OutboxProducer
       LOG.debug("Polled {} new items", items.size());
       for (OutboxItem item : items)
         send(item);
+      if (nextCleanup.isBefore(LocalTime.now(clock)))
+      {
+        int deleted = repository.delete(watermarks.getLowest());
+        nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
+        LOG.info(
+            "Cleaned up {} entries from outbox, next clean-up: {}",
+            deleted,
+            nextCleanup);
+      }
     }
     while (items.size() > 0);
   }
@@ -75,16 +94,15 @@ public class OutboxProducer
     {
       if (metadata != null)
       {
-        int deleted = repository.delete(item.getSequenceNumber());
+        watermarks.set(metadata.partition(), item.getSequenceNumber());
         LOG.info(
-            "{}/{}:{} - {}:{}={} - deleted: {}",
+            "{}/{}:{} - {}:{}={}",
             metadata.topic(),
             metadata.partition(),
             metadata.offset(),
             item.getSequenceNumber(),
             record.key(),
-            record.value(),
-            deleted);
+            record.value());
       }
       else
       {