Old entries are removed from the outbox-table in batches
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / Watermarks.java
1 package de.juplo.kafka.outbox.delivery;
2
3
4 public class Watermarks
5 {
6   private long[] watermarks = new long[0];
7
8
9   public synchronized void set(int partition, long watermark)
10   {
11     if (partition >= watermarks.length)
12     {
13       long[] resized = new long[partition + 1];
14       for (int i = 0; i < watermarks.length; i++)
15         resized[i] = watermarks[i];
16       watermarks = resized;
17     }
18
19     watermarks[partition] = watermark;
20   }
21
22   public synchronized long getLowest()
23   {
24     long lowest = Long.MAX_VALUE;
25
26     for (int i = 0; i < watermarks.length; i++)
27       if (watermarks[i] < lowest)
28         lowest = watermarks[i];
29
30     return lowest;
31   }
32 }