import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
+import java.time.Clock;
+
@SpringBootApplication
@EnableConfigurationProperties(ApplicationProperties.class)
@EnableScheduling
public class Application
{
+ @Bean
+ public OutboxProducer outboxProducer(
+ ApplicationProperties properties,
+ OutboxRepository repository)
+ {
+ return new OutboxProducer(properties, repository, Clock.systemDefaultZone());
+ }
+
+
public static void main(String[] args) throws Exception
{
SpringApplication.run(Application.class, args);
import com.google.common.primitives.Longs;
import org.apache.kafka.common.serialization.StringSerializer;
+import java.time.Clock;
import java.time.Duration;
+import java.time.LocalTime;
import java.util.List;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import static org.apache.kafka.clients.producer.ProducerConfig.*;
-@Component
public class OutboxProducer
{
final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
private final OutboxRepository repository;
private final KafkaProducer<String, String> producer;
private final String topic;
+ private final Watermarks watermarks;
+ private final Clock clock;
+ private final Duration cleanupInterval;
private long sequenceNumber = 0l;
+ private LocalTime nextCleanup;
public OutboxProducer(
ApplicationProperties properties,
- OutboxRepository repository)
+ OutboxRepository repository,
+ Clock clock)
{
this.repository = repository;
this.producer = new KafkaProducer<>(props);
this.topic = properties.topic;
+
+ this.watermarks = new Watermarks();
+ this.clock = clock;
+ this.cleanupInterval = properties.cleanupInterval;
+ this.nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
}
@Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
LOG.debug("Polled {} new items", items.size());
for (OutboxItem item : items)
send(item);
+ if (nextCleanup.isBefore(LocalTime.now(clock)))
+ {
+ int deleted = repository.delete(watermarks.getLowest());
+ nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
+ LOG.info(
+ "Cleaned up {} entries from outbox, next clean-up: {}",
+ deleted,
+ nextCleanup);
+ }
}
while (items.size() > 0);
}
{
if (metadata != null)
{
- int deleted = repository.delete(item.getSequenceNumber());
+ watermarks.set(metadata.partition(), item.getSequenceNumber());
LOG.info(
- "{}/{}:{} - {}:{}={} - deleted: {}",
+ "{}/{}:{} - {}:{}={}",
metadata.topic(),
metadata.partition(),
metadata.offset(),
item.getSequenceNumber(),
record.key(),
- record.value(),
- deleted);
+ record.value());
}
else
{
--- /dev/null
+package de.juplo.kafka.outbox.delivery;
+
+
+public class Watermarks
+{
+ private long[] watermarks = new long[0];
+
+
+ public synchronized void set(int partition, long watermark)
+ {
+ if (partition >= watermarks.length)
+ {
+ long[] resized = new long[partition + 1];
+ for (int i = 0; i < watermarks.length; i++)
+ resized[i] = watermarks[i];
+ watermarks = resized;
+ }
+
+ watermarks[partition] = watermark;
+ }
+
+ public synchronized long getLowest()
+ {
+ long lowest = Long.MAX_VALUE;
+
+ for (int i = 0; i < watermarks.length; i++)
+ if (watermarks[i] < lowest)
+ lowest = watermarks[i];
+
+ return lowest;
+ }
+}