From: Kai Moritz Date: Sun, 7 Feb 2021 13:18:13 +0000 (+0100) Subject: Old entries are removed from the outbox-table in batches X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=374dcbb90bcafe7682a8626de94ed0cd4c377e5e;p=demos%2Fkafka%2Foutbox Old entries are removed from the outbox-table in batches --- diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Application.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Application.java index 6abd181..111c6b4 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Application.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Application.java @@ -3,14 +3,26 @@ package de.juplo.kafka.outbox.delivery; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.EnableScheduling; +import java.time.Clock; + @SpringBootApplication @EnableConfigurationProperties(ApplicationProperties.class) @EnableScheduling public class Application { + @Bean + public OutboxProducer outboxProducer( + ApplicationProperties properties, + OutboxRepository repository) + { + return new OutboxProducer(properties, repository, Clock.systemDefaultZone()); + } + + public static void main(String[] args) throws Exception { SpringApplication.run(Application.class, args); diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/ApplicationProperties.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/ApplicationProperties.java index 4e36aa4..74f91d4 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/ApplicationProperties.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/ApplicationProperties.java @@ -4,6 +4,8 @@ import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; +import java.time.Duration; + @ConfigurationProperties("de.juplo.kafka.outbox") @Getter @@ -12,4 +14,5 @@ public class ApplicationProperties { String bootstrapServers = "localhost:9092"; String topic = "outbox"; + Duration cleanupInterval = Duration.ofSeconds(10); } diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java index 2ad4b7e..29d827a 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java @@ -3,7 +3,9 @@ package de.juplo.kafka.outbox.delivery; import com.google.common.primitives.Longs; import org.apache.kafka.common.serialization.StringSerializer; +import java.time.Clock; import java.time.Duration; +import java.time.LocalTime; import java.util.List; import java.util.Properties; @@ -12,7 +14,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; @@ -20,7 +21,6 @@ import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CON import static org.apache.kafka.clients.producer.ProducerConfig.*; -@Component public class OutboxProducer { final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class); @@ -30,12 +30,17 @@ public class OutboxProducer private final OutboxRepository repository; private final KafkaProducer producer; private final String topic; + private final Watermarks watermarks; + private final Clock clock; + private final Duration cleanupInterval; private long sequenceNumber = 0l; + private LocalTime nextCleanup; public OutboxProducer( ApplicationProperties properties, - OutboxRepository repository) + OutboxRepository repository, + Clock clock) { this.repository = repository; @@ -47,6 +52,11 @@ public class OutboxProducer this.producer = new KafkaProducer<>(props); this.topic = properties.topic; + + this.watermarks = new Watermarks(); + this.clock = clock; + this.cleanupInterval = properties.cleanupInterval; + this.nextCleanup = LocalTime.now(clock).plus(cleanupInterval); } @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}") @@ -59,6 +69,15 @@ public class OutboxProducer LOG.debug("Polled {} new items", items.size()); for (OutboxItem item : items) send(item); + if (nextCleanup.isBefore(LocalTime.now(clock))) + { + int deleted = repository.delete(watermarks.getLowest()); + nextCleanup = LocalTime.now(clock).plus(cleanupInterval); + LOG.info( + "Cleaned up {} entries from outbox, next clean-up: {}", + deleted, + nextCleanup); + } } while (items.size() > 0); } @@ -75,16 +94,15 @@ public class OutboxProducer { if (metadata != null) { - int deleted = repository.delete(item.getSequenceNumber()); + watermarks.set(metadata.partition(), item.getSequenceNumber()); LOG.info( - "{}/{}:{} - {}:{}={} - deleted: {}", + "{}/{}:{} - {}:{}={}", metadata.topic(), metadata.partition(), metadata.offset(), item.getSequenceNumber(), record.key(), - record.value(), - deleted); + record.value()); } else { diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java index abf2d1d..d6c814d 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java @@ -19,7 +19,7 @@ public class OutboxRepository private static final String SQL_UPDATE = "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)"; private static final String SQL_DELETE = - "DELETE FROM outbox WHERE id = :id"; + "DELETE FROM outbox WHERE id <= :id"; private final NamedParameterJdbcTemplate jdbcTemplate; diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java new file mode 100644 index 0000000..8f071f9 --- /dev/null +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java @@ -0,0 +1,32 @@ +package de.juplo.kafka.outbox.delivery; + + +public class Watermarks +{ + private long[] watermarks = new long[0]; + + + public synchronized void set(int partition, long watermark) + { + if (partition >= watermarks.length) + { + long[] resized = new long[partition + 1]; + for (int i = 0; i < watermarks.length; i++) + resized[i] = watermarks[i]; + watermarks = resized; + } + + watermarks[partition] = watermark; + } + + public synchronized long getLowest() + { + long lowest = Long.MAX_VALUE; + + for (int i = 0; i < watermarks.length; i++) + if (watermarks[i] < lowest) + lowest = watermarks[i]; + + return lowest; + } +}