Old entries are removed from the outbox-table in batches
authorKai Moritz <kai@juplo.de>
Sun, 7 Feb 2021 13:18:13 +0000 (14:18 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 16 May 2021 21:53:16 +0000 (23:53 +0200)
delivery/src/main/java/de/juplo/kafka/outbox/delivery/Application.java
delivery/src/main/java/de/juplo/kafka/outbox/delivery/ApplicationProperties.java
delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java
delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java
delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java [new file with mode: 0644]

index 6abd181..111c6b4 100644 (file)
@@ -3,14 +3,26 @@ package de.juplo.kafka.outbox.delivery;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
 import org.springframework.scheduling.annotation.EnableScheduling;
 
+import java.time.Clock;
+
 
 @SpringBootApplication
 @EnableConfigurationProperties(ApplicationProperties.class)
 @EnableScheduling
 public class Application
 {
+  @Bean
+  public OutboxProducer outboxProducer(
+      ApplicationProperties properties,
+      OutboxRepository repository)
+  {
+    return new OutboxProducer(properties, repository, Clock.systemDefaultZone());
+  }
+
+
   public static void main(String[] args) throws Exception
   {
     SpringApplication.run(Application.class, args);
index 4e36aa4..74f91d4 100644 (file)
@@ -4,6 +4,8 @@ import lombok.Getter;
 import lombok.Setter;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
+import java.time.Duration;
+
 
 @ConfigurationProperties("de.juplo.kafka.outbox")
 @Getter
@@ -12,4 +14,5 @@ public class ApplicationProperties
 {
   String bootstrapServers = "localhost:9092";
   String topic = "outbox";
+  Duration cleanupInterval = Duration.ofSeconds(10);
 }
index 2ad4b7e..29d827a 100644 (file)
@@ -3,7 +3,9 @@ package de.juplo.kafka.outbox.delivery;
 import com.google.common.primitives.Longs;
 import org.apache.kafka.common.serialization.StringSerializer;
 
+import java.time.Clock;
 import java.time.Duration;
+import java.time.LocalTime;
 import java.util.List;
 import java.util.Properties;
 
@@ -12,7 +14,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
 
 import javax.annotation.PreDestroy;
 
@@ -20,7 +21,6 @@ import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CON
 import static org.apache.kafka.clients.producer.ProducerConfig.*;
 
 
-@Component
 public class OutboxProducer
 {
   final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
@@ -30,12 +30,17 @@ public class OutboxProducer
   private final OutboxRepository repository;
   private final KafkaProducer<String, String> producer;
   private final String topic;
+  private final Watermarks watermarks;
+  private final Clock clock;
+  private final Duration cleanupInterval;
 
   private long sequenceNumber = 0l;
+  private LocalTime nextCleanup;
 
   public OutboxProducer(
       ApplicationProperties properties,
-      OutboxRepository repository)
+      OutboxRepository repository,
+      Clock clock)
   {
     this.repository = repository;
 
@@ -47,6 +52,11 @@ public class OutboxProducer
 
     this.producer = new KafkaProducer<>(props);
     this.topic = properties.topic;
+
+    this.watermarks = new Watermarks();
+    this.clock = clock;
+    this.cleanupInterval = properties.cleanupInterval;
+    this.nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
   }
 
   @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
@@ -59,6 +69,15 @@ public class OutboxProducer
       LOG.debug("Polled {} new items", items.size());
       for (OutboxItem item : items)
         send(item);
+      if (nextCleanup.isBefore(LocalTime.now(clock)))
+      {
+        int deleted = repository.delete(watermarks.getLowest());
+        nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
+        LOG.info(
+            "Cleaned up {} entries from outbox, next clean-up: {}",
+            deleted,
+            nextCleanup);
+      }
     }
     while (items.size() > 0);
   }
@@ -75,16 +94,15 @@ public class OutboxProducer
     {
       if (metadata != null)
       {
-        int deleted = repository.delete(item.getSequenceNumber());
+        watermarks.set(metadata.partition(), item.getSequenceNumber());
         LOG.info(
-            "{}/{}:{} - {}:{}={} - deleted: {}",
+            "{}/{}:{} - {}:{}={}",
             metadata.topic(),
             metadata.partition(),
             metadata.offset(),
             item.getSequenceNumber(),
             record.key(),
-            record.value(),
-            deleted);
+            record.value());
       }
       else
       {
index abf2d1d..d6c814d 100644 (file)
@@ -19,7 +19,7 @@ public class OutboxRepository
   private static final String SQL_UPDATE =
       "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
   private static final String SQL_DELETE =
-      "DELETE FROM outbox WHERE id = :id";
+      "DELETE FROM outbox WHERE id <= :id";
 
   private final NamedParameterJdbcTemplate jdbcTemplate;
 
diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java
new file mode 100644 (file)
index 0000000..8f071f9
--- /dev/null
@@ -0,0 +1,32 @@
+package de.juplo.kafka.outbox.delivery;
+
+
+public class Watermarks
+{
+  private long[] watermarks = new long[0];
+
+
+  public synchronized void set(int partition, long watermark)
+  {
+    if (partition >= watermarks.length)
+    {
+      long[] resized = new long[partition + 1];
+      for (int i = 0; i < watermarks.length; i++)
+        resized[i] = watermarks[i];
+      watermarks = resized;
+    }
+
+    watermarks[partition] = watermark;
+  }
+
+  public synchronized long getLowest()
+  {
+    long lowest = Long.MAX_VALUE;
+
+    for (int i = 0; i < watermarks.length; i++)
+      if (watermarks[i] < lowest)
+        lowest = watermarks[i];
+
+    return lowest;
+  }
+}