import com.google.common.primitives.Longs;
import org.apache.kafka.common.serialization.StringSerializer;
+import java.time.Clock;
import java.time.Duration;
+import java.time.LocalTime;
import java.util.List;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import static org.apache.kafka.clients.producer.ProducerConfig.*;
-@Component
public class OutboxProducer
{
final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
private final OutboxRepository repository;
private final KafkaProducer<String, String> producer;
private final String topic;
+ private final Watermarks watermarks;
+ private final Clock clock;
+ private final Duration cleanupInterval;
private long sequenceNumber = 0l;
+ private LocalTime nextCleanup;
public OutboxProducer(
ApplicationProperties properties,
- OutboxRepository repository)
+ OutboxRepository repository,
+ Clock clock)
{
this.repository = repository;
this.producer = new KafkaProducer<>(props);
this.topic = properties.topic;
+
+ this.watermarks = new Watermarks();
+ this.clock = clock;
+ this.cleanupInterval = properties.cleanupInterval;
+ this.nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
}
@Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
LOG.debug("Polled {} new items", items.size());
for (OutboxItem item : items)
send(item);
+ if (nextCleanup.isBefore(LocalTime.now(clock)))
+ {
+ int deleted = repository.delete(watermarks.getLowest());
+ nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
+ LOG.info(
+ "Cleaned up {} entries from outbox, next clean-up: {}",
+ deleted,
+ nextCleanup);
+ }
}
while (items.size() > 0);
}
{
if (metadata != null)
{
- int deleted = repository.delete(item.getSequenceNumber());
+ watermarks.set(metadata.partition(), item.getSequenceNumber());
LOG.info(
- "{}/{}:{} - {}:{}={} - deleted: {}",
+ "{}/{}:{} - {}:{}={}",
metadata.topic(),
metadata.partition(),
metadata.offset(),
item.getSequenceNumber(),
record.key(),
- record.value(),
- deleted);
+ record.value());
}
else
{