public class Watermarks
{
- private long[] watermarks = new long[0];
+ private final long[] watermarks;
- public synchronized void set(int partition, long watermark)
+ public Watermarks(int partitions)
{
- if (partition >= watermarks.length)
- {
- long[] resized = new long[partition + 1];
- for (int i = 0; i < watermarks.length; i++)
- resized[i] = watermarks[i];
- watermarks = resized;
- }
+ watermarks = new long[partitions];
+ }
+
+ public synchronized void set(int partition, long watermark)
+ {
watermarks[partition] = watermark;
}
return lowest;
}
+
+ @Override
+ public String toString()
+ {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < watermarks.length; i++)
+ {
+ builder.append("partition[");
+ builder.append(i);
+ builder.append("]=");
+ builder.append(watermarks[i]);
+ if (i != watermarks.length - 1)
+ builder.append(", ");
+ }
+ return builder.toString();
+ }
}