Old entries are removed from the outbox-table in batches
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / Watermarks.java
diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java
new file mode 100644 (file)
index 0000000..8f071f9
--- /dev/null
@@ -0,0 +1,32 @@
+package de.juplo.kafka.outbox.delivery;
+
+
+public class Watermarks
+{
+  private long[] watermarks = new long[0];
+
+
+  public synchronized void set(int partition, long watermark)
+  {
+    if (partition >= watermarks.length)
+    {
+      long[] resized = new long[partition + 1];
+      for (int i = 0; i < watermarks.length; i++)
+        resized[i] = watermarks[i];
+      watermarks = resized;
+    }
+
+    watermarks[partition] = watermark;
+  }
+
+  public synchronized long getLowest()
+  {
+    long lowest = Long.MAX_VALUE;
+
+    for (int i = 0; i < watermarks.length; i++)
+      if (watermarks[i] < lowest)
+        lowest = watermarks[i];
+
+    return lowest;
+  }
+}