X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Foutbox;a=blobdiff_plain;f=delivery%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2Fdelivery%2FWatermarks.java;fp=delivery%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2Fdelivery%2FWatermarks.java;h=4bd0c9ea435e4b8d3f7e0b1a611a9c5522b78692;hp=8f071f92504401ff1bb04e79a9607dde779559cb;hb=a92d318043bf698dc2a949db2b76893c8abf03a1;hpb=374dcbb90bcafe7682a8626de94ed0cd4c377e5e diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java index 8f071f9..4bd0c9e 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java @@ -3,19 +3,17 @@ package de.juplo.kafka.outbox.delivery; public class Watermarks { - private long[] watermarks = new long[0]; + private final long[] watermarks; - public synchronized void set(int partition, long watermark) + public Watermarks(int partitions) { - if (partition >= watermarks.length) - { - long[] resized = new long[partition + 1]; - for (int i = 0; i < watermarks.length; i++) - resized[i] = watermarks[i]; - watermarks = resized; - } + watermarks = new long[partitions]; + } + + public synchronized void set(int partition, long watermark) + { watermarks[partition] = watermark; } @@ -29,4 +27,20 @@ public class Watermarks return lowest; } + + @Override + public String toString() + { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < watermarks.length; i++) + { + builder.append("partition["); + builder.append(i); + builder.append("]="); + builder.append(watermarks[i]); + if (i != watermarks.length - 1) + builder.append(", "); + } + return builder.toString(); + } }