Sent messages are deleted individually after a crash
authorKai Moritz <kai@juplo.de>
Sun, 31 Jan 2021 21:24:16 +0000 (22:24 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 16 May 2021 22:04:06 +0000 (00:04 +0200)
* If all messages with a sequence-number lower than or equal than the
  sequence number of a message already send are deleted, this can only
  be done for the lowest sequence-number seen accross all partitions
  without violating the at-least-once semantics.
* Deleting seen messages individually by specifying their id (aka
  sequence-number) explicitly in the delete-command allows to delete
  _all_ messages from the outbox, that have been seen on the topic,
  hence, achieving exactly-once-semantics even in the case of an
  unclean shutdown (aka crash) of the OutboxProducer

delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java
delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java
delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java [deleted file]

index f7b0c7f..1327113 100644 (file)
@@ -36,7 +36,7 @@ public class OutboxProducer
   private final OutboxRepository repository;
   private final KafkaProducer<String, String> producer;
   private final String topic;
-  private final Watermarks watermarks;
+  private final Set<Long> send = new HashSet<>();
   private final Clock clock;
   private final Duration cleanupInterval;
 
@@ -78,8 +78,6 @@ public class OutboxProducer
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.assign(assignment);
 
-    this.watermarks = new Watermarks(partitions.size());
-
     long[] currentOffsets = new long[partitions.size()];
     for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumer.committed(assignment).entrySet())
     {
@@ -96,6 +94,7 @@ public class OutboxProducer
     }
     LOG.info("End-offsets: {}", endOffsets);
 
+    int deleted = 0;
     while(!Arrays.equals(currentOffsets, endOffsets))
     {
       ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
@@ -103,23 +102,25 @@ public class OutboxProducer
       records.forEach(record ->
       {
         long recordSequenceNumber = Longs.fromByteArray(record.headers().lastHeader(HEADER).value());
-        LOG.debug("Found watermark partition[{}]={}", record.partition(), recordSequenceNumber);
-        watermarks.set(record.partition(), recordSequenceNumber);
+        LOG.debug(
+            "Found message #{} on offset {} of partition {}",
+            recordSequenceNumber,
+            record.offset(),
+            record.partition());
+        send.add(recordSequenceNumber);
         currentOffsets[record.partition()] = record.offset();
       });
+      deleted += cleanUp();
       LOG.debug("Current offsets: {}", currentOffsets);
     }
 
-    LOG.info("Found watermarks: {}", watermarks);
-
-    sequenceNumber = watermarks.getLowest();
-    LOG.info("Restored sequence-number: {}", sequenceNumber);
+    LOG.info("Cleaned up {} already send entries from outbox table", deleted);
 
     consumer.close();
 
     this.clock = clock;
     this.cleanupInterval = properties.cleanupInterval;
-    this.nextCleanup = LocalTime.now(clock);
+    this.nextCleanup = LocalTime.now(clock).plus(this.cleanupInterval);
   }
 
   @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
@@ -134,17 +135,22 @@ public class OutboxProducer
         send(item);
       if (nextCleanup.isBefore(LocalTime.now(clock)))
       {
-        int deleted = repository.delete(watermarks.getLowest());
+        cleanUp();
         nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
-        LOG.info(
-            "Cleaned up {} entries from outbox, next clean-up: {}",
-            deleted,
-            nextCleanup);
+        LOG.debug("Next clean-up: {}", nextCleanup);
       }
     }
     while (items.size() > 0);
   }
 
+  int cleanUp()
+  {
+    int deleted = repository.delete(send);
+    LOG.debug("Cleaned up {}/{} entries from outbox, next clean-up: {}", deleted, send.size());
+    send.clear();
+    return deleted;
+  }
+
   void send(OutboxItem item)
   {
     final ProducerRecord<String, String> record =
@@ -157,7 +163,7 @@ public class OutboxProducer
     {
       if (metadata != null)
       {
-        watermarks.set(metadata.partition(), item.getSequenceNumber());
+        send.add(item.getSequenceNumber());
         LOG.info(
             "{}/{}:{} - {}:{}={}",
             metadata.topic(),
index d6c814d..9dee55d 100644 (file)
@@ -8,6 +8,7 @@ import org.springframework.stereotype.Repository;
 import java.sql.Timestamp;
 import java.time.ZonedDateTime;
 import java.util.List;
+import java.util.Set;
 
 
 @Repository
@@ -19,7 +20,7 @@ public class OutboxRepository
   private static final String SQL_UPDATE =
       "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
   private static final String SQL_DELETE =
-      "DELETE FROM outbox WHERE id <= :id";
+      "DELETE FROM outbox WHERE id IN (:ids)";
 
   private final NamedParameterJdbcTemplate jdbcTemplate;
 
@@ -33,10 +34,12 @@ public class OutboxRepository
     jdbcTemplate.update(SQL_UPDATE, parameters);
   }
 
-  public int delete(Long id)
+  public int delete(Set<Long> ids)
   {
+    if (ids == null || ids.isEmpty())
+      return 0;
     MapSqlParameterSource parameters = new MapSqlParameterSource();
-    parameters.addValue("id", id);
+    parameters.addValue("ids", ids);
     return jdbcTemplate.update(SQL_DELETE, parameters);
   }
 
diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java
deleted file mode 100644 (file)
index 4bd0c9e..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-package de.juplo.kafka.outbox.delivery;
-
-
-public class Watermarks
-{
-  private final long[] watermarks;
-
-
-  public Watermarks(int partitions)
-  {
-    watermarks = new long[partitions];
-  }
-
-
-  public synchronized void set(int partition, long watermark)
-  {
-    watermarks[partition] = watermark;
-  }
-
-  public synchronized long getLowest()
-  {
-    long lowest = Long.MAX_VALUE;
-
-    for (int i = 0; i < watermarks.length; i++)
-      if (watermarks[i] < lowest)
-        lowest = watermarks[i];
-
-    return lowest;
-  }
-
-  @Override
-  public String toString()
-  {
-    StringBuilder builder = new StringBuilder();
-    for (int i = 0; i < watermarks.length; i++)
-    {
-      builder.append("partition[");
-      builder.append(i);
-      builder.append("]=");
-      builder.append(watermarks[i]);
-      if (i != watermarks.length - 1)
-        builder.append(", ");
-    }
-    return builder.toString();
-  }
-}