private final OutboxRepository repository;
private final KafkaProducer<String, String> producer;
private final String topic;
- private final Watermarks watermarks;
+ private final Set<Long> send = new HashSet<>();
private final Clock clock;
private final Duration cleanupInterval;
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.assign(assignment);
- this.watermarks = new Watermarks(partitions.size());
-
long[] currentOffsets = new long[partitions.size()];
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumer.committed(assignment).entrySet())
{
}
LOG.info("End-offsets: {}", endOffsets);
+ int deleted = 0;
while(!Arrays.equals(currentOffsets, endOffsets))
{
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record ->
{
long recordSequenceNumber = Longs.fromByteArray(record.headers().lastHeader(HEADER).value());
- LOG.debug("Found watermark partition[{}]={}", record.partition(), recordSequenceNumber);
- watermarks.set(record.partition(), recordSequenceNumber);
+ LOG.debug(
+ "Found message #{} on offset {} of partition {}",
+ recordSequenceNumber,
+ record.offset(),
+ record.partition());
+ send.add(recordSequenceNumber);
currentOffsets[record.partition()] = record.offset();
});
+ deleted += cleanUp();
LOG.debug("Current offsets: {}", currentOffsets);
}
- LOG.info("Found watermarks: {}", watermarks);
-
- sequenceNumber = watermarks.getLowest();
- LOG.info("Restored sequence-number: {}", sequenceNumber);
+ LOG.info("Cleaned up {} already send entries from outbox table", deleted);
consumer.close();
this.clock = clock;
this.cleanupInterval = properties.cleanupInterval;
- this.nextCleanup = LocalTime.now(clock);
+ this.nextCleanup = LocalTime.now(clock).plus(this.cleanupInterval);
}
@Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
send(item);
if (nextCleanup.isBefore(LocalTime.now(clock)))
{
- int deleted = repository.delete(watermarks.getLowest());
+ cleanUp();
nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
- LOG.info(
- "Cleaned up {} entries from outbox, next clean-up: {}",
- deleted,
- nextCleanup);
+ LOG.debug("Next clean-up: {}", nextCleanup);
}
}
while (items.size() > 0);
}
+ int cleanUp()
+ {
+ int deleted = repository.delete(send);
+ LOG.debug("Cleaned up {}/{} entries from outbox, next clean-up: {}", deleted, send.size());
+ send.clear();
+ return deleted;
+ }
+
void send(OutboxItem item)
{
final ProducerRecord<String, String> record =
{
if (metadata != null)
{
- watermarks.set(metadata.partition(), item.getSequenceNumber());
+ send.add(item.getSequenceNumber());
LOG.info(
"{}/{}:{} - {}:{}={}",
metadata.topic(),
import java.sql.Timestamp;
import java.time.ZonedDateTime;
import java.util.List;
+import java.util.Set;
@Repository
private static final String SQL_UPDATE =
"INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
private static final String SQL_DELETE =
- "DELETE FROM outbox WHERE id <= :id";
+ "DELETE FROM outbox WHERE id IN (:ids)";
private final NamedParameterJdbcTemplate jdbcTemplate;
jdbcTemplate.update(SQL_UPDATE, parameters);
}
- public int delete(Long id)
+ public int delete(Set<Long> ids)
{
+ if (ids == null || ids.isEmpty())
+ return 0;
MapSqlParameterSource parameters = new MapSqlParameterSource();
- parameters.addValue("id", id);
+ parameters.addValue("ids", ids);
return jdbcTemplate.update(SQL_DELETE, parameters);
}
+++ /dev/null
-package de.juplo.kafka.outbox.delivery;
-
-
-public class Watermarks
-{
- private final long[] watermarks;
-
-
- public Watermarks(int partitions)
- {
- watermarks = new long[partitions];
- }
-
-
- public synchronized void set(int partition, long watermark)
- {
- watermarks[partition] = watermark;
- }
-
- public synchronized long getLowest()
- {
- long lowest = Long.MAX_VALUE;
-
- for (int i = 0; i < watermarks.length; i++)
- if (watermarks[i] < lowest)
- lowest = watermarks[i];
-
- return lowest;
- }
-
- @Override
- public String toString()
- {
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < watermarks.length; i++)
- {
- builder.append("partition[");
- builder.append(i);
- builder.append("]=");
- builder.append(watermarks[i]);
- if (i != watermarks.length - 1)
- builder.append(", ");
- }
- return builder.toString();
- }
-}