From b6d734cd09d3e23171eaf6235d19e73bc11ab420 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 31 Jan 2021 22:24:16 +0100 Subject: [PATCH] Sent messages are deleted individually after a crash * If all messages with a sequence-number lower than or equal than the sequence number of a message already send are deleted, this can only be done for the lowest sequence-number seen accross all partitions without violating the at-least-once semantics. * Deleting seen messages individually by specifying their id (aka sequence-number) explicitly in the delete-command allows to delete _all_ messages from the outbox, that have been seen on the topic, hence, achieving exactly-once-semantics even in the case of an unclean shutdown (aka crash) of the OutboxProducer --- .../kafka/outbox/delivery/OutboxProducer.java | 38 ++++++++------- .../outbox/delivery/OutboxRepository.java | 9 ++-- .../kafka/outbox/delivery/Watermarks.java | 46 ------------------- 3 files changed, 28 insertions(+), 65 deletions(-) delete mode 100644 delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java index f7b0c7f..1327113 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java @@ -36,7 +36,7 @@ public class OutboxProducer private final OutboxRepository repository; private final KafkaProducer producer; private final String topic; - private final Watermarks watermarks; + private final Set send = new HashSet<>(); private final Clock clock; private final Duration cleanupInterval; @@ -78,8 +78,6 @@ public class OutboxProducer KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.assign(assignment); - this.watermarks = new Watermarks(partitions.size()); - long[] currentOffsets = new long[partitions.size()]; for (Map.Entry entry : consumer.committed(assignment).entrySet()) { @@ -96,6 +94,7 @@ public class OutboxProducer } LOG.info("End-offsets: {}", endOffsets); + int deleted = 0; while(!Arrays.equals(currentOffsets, endOffsets)) { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); @@ -103,23 +102,25 @@ public class OutboxProducer records.forEach(record -> { long recordSequenceNumber = Longs.fromByteArray(record.headers().lastHeader(HEADER).value()); - LOG.debug("Found watermark partition[{}]={}", record.partition(), recordSequenceNumber); - watermarks.set(record.partition(), recordSequenceNumber); + LOG.debug( + "Found message #{} on offset {} of partition {}", + recordSequenceNumber, + record.offset(), + record.partition()); + send.add(recordSequenceNumber); currentOffsets[record.partition()] = record.offset(); }); + deleted += cleanUp(); LOG.debug("Current offsets: {}", currentOffsets); } - LOG.info("Found watermarks: {}", watermarks); - - sequenceNumber = watermarks.getLowest(); - LOG.info("Restored sequence-number: {}", sequenceNumber); + LOG.info("Cleaned up {} already send entries from outbox table", deleted); consumer.close(); this.clock = clock; this.cleanupInterval = properties.cleanupInterval; - this.nextCleanup = LocalTime.now(clock); + this.nextCleanup = LocalTime.now(clock).plus(this.cleanupInterval); } @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}") @@ -134,17 +135,22 @@ public class OutboxProducer send(item); if (nextCleanup.isBefore(LocalTime.now(clock))) { - int deleted = repository.delete(watermarks.getLowest()); + cleanUp(); nextCleanup = LocalTime.now(clock).plus(cleanupInterval); - LOG.info( - "Cleaned up {} entries from outbox, next clean-up: {}", - deleted, - nextCleanup); + LOG.debug("Next clean-up: {}", nextCleanup); } } while (items.size() > 0); } + int cleanUp() + { + int deleted = repository.delete(send); + LOG.debug("Cleaned up {}/{} entries from outbox, next clean-up: {}", deleted, send.size()); + send.clear(); + return deleted; + } + void send(OutboxItem item) { final ProducerRecord record = @@ -157,7 +163,7 @@ public class OutboxProducer { if (metadata != null) { - watermarks.set(metadata.partition(), item.getSequenceNumber()); + send.add(item.getSequenceNumber()); LOG.info( "{}/{}:{} - {}:{}={}", metadata.topic(), diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java index d6c814d..9dee55d 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java @@ -8,6 +8,7 @@ import org.springframework.stereotype.Repository; import java.sql.Timestamp; import java.time.ZonedDateTime; import java.util.List; +import java.util.Set; @Repository @@ -19,7 +20,7 @@ public class OutboxRepository private static final String SQL_UPDATE = "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)"; private static final String SQL_DELETE = - "DELETE FROM outbox WHERE id <= :id"; + "DELETE FROM outbox WHERE id IN (:ids)"; private final NamedParameterJdbcTemplate jdbcTemplate; @@ -33,10 +34,12 @@ public class OutboxRepository jdbcTemplate.update(SQL_UPDATE, parameters); } - public int delete(Long id) + public int delete(Set ids) { + if (ids == null || ids.isEmpty()) + return 0; MapSqlParameterSource parameters = new MapSqlParameterSource(); - parameters.addValue("id", id); + parameters.addValue("ids", ids); return jdbcTemplate.update(SQL_DELETE, parameters); } diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java deleted file mode 100644 index 4bd0c9e..0000000 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java +++ /dev/null @@ -1,46 +0,0 @@ -package de.juplo.kafka.outbox.delivery; - - -public class Watermarks -{ - private final long[] watermarks; - - - public Watermarks(int partitions) - { - watermarks = new long[partitions]; - } - - - public synchronized void set(int partition, long watermark) - { - watermarks[partition] = watermark; - } - - public synchronized long getLowest() - { - long lowest = Long.MAX_VALUE; - - for (int i = 0; i < watermarks.length; i++) - if (watermarks[i] < lowest) - lowest = watermarks[i]; - - return lowest; - } - - @Override - public String toString() - { - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < watermarks.length; i++) - { - builder.append("partition["); - builder.append(i); - builder.append("]="); - builder.append(watermarks[i]); - if (i != watermarks.length - 1) - builder.append(", "); - } - return builder.toString(); - } -} -- 2.20.1