X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Foutbox;a=blobdiff_plain;f=delivery%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2Fdelivery%2FOutboxProducer.java;h=29d827acde5aed9bd9d07ce3276ac521759d721c;hp=2ad4b7e38acaf45cac25d712d9e2985223604074;hb=374dcbb90bcafe7682a8626de94ed0cd4c377e5e;hpb=cf23cb6c92a4a166ab9a8dff7d967a0bb2847378 diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java index 2ad4b7e..29d827a 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java @@ -3,7 +3,9 @@ package de.juplo.kafka.outbox.delivery; import com.google.common.primitives.Longs; import org.apache.kafka.common.serialization.StringSerializer; +import java.time.Clock; import java.time.Duration; +import java.time.LocalTime; import java.util.List; import java.util.Properties; @@ -12,7 +14,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; @@ -20,7 +21,6 @@ import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CON import static org.apache.kafka.clients.producer.ProducerConfig.*; -@Component public class OutboxProducer { final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class); @@ -30,12 +30,17 @@ public class OutboxProducer private final OutboxRepository repository; private final KafkaProducer producer; private final String topic; + private final Watermarks watermarks; + private final Clock clock; + private final Duration cleanupInterval; private long sequenceNumber = 0l; + private LocalTime nextCleanup; public OutboxProducer( ApplicationProperties properties, - OutboxRepository repository) + OutboxRepository repository, + Clock clock) { this.repository = repository; @@ -47,6 +52,11 @@ public class OutboxProducer this.producer = new KafkaProducer<>(props); this.topic = properties.topic; + + this.watermarks = new Watermarks(); + this.clock = clock; + this.cleanupInterval = properties.cleanupInterval; + this.nextCleanup = LocalTime.now(clock).plus(cleanupInterval); } @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}") @@ -59,6 +69,15 @@ public class OutboxProducer LOG.debug("Polled {} new items", items.size()); for (OutboxItem item : items) send(item); + if (nextCleanup.isBefore(LocalTime.now(clock))) + { + int deleted = repository.delete(watermarks.getLowest()); + nextCleanup = LocalTime.now(clock).plus(cleanupInterval); + LOG.info( + "Cleaned up {} entries from outbox, next clean-up: {}", + deleted, + nextCleanup); + } } while (items.size() > 0); } @@ -75,16 +94,15 @@ public class OutboxProducer { if (metadata != null) { - int deleted = repository.delete(item.getSequenceNumber()); + watermarks.set(metadata.partition(), item.getSequenceNumber()); LOG.info( - "{}/{}:{} - {}:{}={} - deleted: {}", + "{}/{}:{} - {}:{}={}", metadata.topic(), metadata.partition(), metadata.offset(), item.getSequenceNumber(), record.key(), - record.value(), - deleted); + record.value()); } else {