X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Foutbox;a=blobdiff_plain;f=delivery%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2Fdelivery%2FWatermarks.java;fp=delivery%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2Fdelivery%2FWatermarks.java;h=8f071f92504401ff1bb04e79a9607dde779559cb;hp=0000000000000000000000000000000000000000;hb=374dcbb90bcafe7682a8626de94ed0cd4c377e5e;hpb=cf23cb6c92a4a166ab9a8dff7d967a0bb2847378 diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java new file mode 100644 index 0000000..8f071f9 --- /dev/null +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java @@ -0,0 +1,32 @@ +package de.juplo.kafka.outbox.delivery; + + +public class Watermarks +{ + private long[] watermarks = new long[0]; + + + public synchronized void set(int partition, long watermark) + { + if (partition >= watermarks.length) + { + long[] resized = new long[partition + 1]; + for (int i = 0; i < watermarks.length; i++) + resized[i] = watermarks[i]; + watermarks = resized; + } + + watermarks[partition] = watermark; + } + + public synchronized long getLowest() + { + long lowest = Long.MAX_VALUE; + + for (int i = 0; i < watermarks.length; i++) + if (watermarks[i] < lowest) + lowest = watermarks[i]; + + return lowest; + } +}