private final OutboxRepository repository;
private final KafkaProducer<String, String> producer;
private final String topic;
- private final Watermarks watermarks;
+ private final Set<Long> send = new HashSet<>();
private final Clock clock;
private final Duration cleanupInterval;
LOG.info("Using topic {} with {} partitions", topic, partitions);
- this.watermarks = new Watermarks(partitions.size());
-
long[] currentOffsets = new long[partitions.size()];
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumer.committed(assignment).entrySet())
{
}
LOG.info("End-offsets: {}", endOffsets);
+ int deleted = 0;
while(!Arrays.equals(currentOffsets, endOffsets))
{
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record ->
{
long recordSequenceNumber = Longs.fromByteArray(record.headers().lastHeader(HEADER).value());
- LOG.debug("Found watermark partition[{}]={}", record.partition(), recordSequenceNumber);
- watermarks.set(record.partition(), recordSequenceNumber);
+ LOG.debug(
+ "Found message #{} on offset {} of partition {}",
+ recordSequenceNumber,
+ record.offset(),
+ record.partition());
+ send.add(recordSequenceNumber);
currentOffsets[record.partition()] = record.offset();
});
+ deleted += cleanUp();
LOG.debug("Current offsets: {}", currentOffsets);
}
- LOG.info("Found watermarks: {}", watermarks);
-
- sequenceNumber = watermarks.getLowest();
- LOG.info("Restored sequence-number: {}", sequenceNumber);
+ LOG.info("Cleaned up {} already send entries from outbox table", deleted);
consumer.close();
this.clock = clock;
this.cleanupInterval = properties.cleanupInterval;
- this.nextCleanup = LocalTime.now(clock);
+ this.nextCleanup = LocalTime.now(clock).plus(this.cleanupInterval);
}
@Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
send(item);
if (nextCleanup.isBefore(LocalTime.now(clock)))
{
- int deleted = repository.delete(watermarks.getLowest());
+ cleanUp();
nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
- LOG.info(
- "Cleaned up {} entries from outbox, next clean-up: {}",
- deleted,
- nextCleanup);
+ LOG.debug("Next clean-up: {}", nextCleanup);
}
}
while (items.size() > 0);
}
+ int cleanUp()
+ {
+ int deleted = repository.delete(send);
+ LOG.debug("Cleaned up {}/{} entries from outbox, next clean-up: {}", deleted, send.size());
+ send.clear();
+ return deleted;
+ }
+
void send(OutboxItem item)
{
final ProducerRecord<String, String> record =
{
if (metadata != null)
{
- watermarks.set(metadata.partition(), item.getSequenceNumber());
+ send.add(item.getSequenceNumber());
LOG.info(
"{}/{}:{} - {}:{}={}",
metadata.topic(),