Sent messages are deleted individually after a crash
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / OutboxProducer.java
index f7b0c7f..1327113 100644 (file)
@@ -36,7 +36,7 @@ public class OutboxProducer
   private final OutboxRepository repository;
   private final KafkaProducer<String, String> producer;
   private final String topic;
-  private final Watermarks watermarks;
+  private final Set<Long> send = new HashSet<>();
   private final Clock clock;
   private final Duration cleanupInterval;
 
@@ -78,8 +78,6 @@ public class OutboxProducer
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.assign(assignment);
 
-    this.watermarks = new Watermarks(partitions.size());
-
     long[] currentOffsets = new long[partitions.size()];
     for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumer.committed(assignment).entrySet())
     {
@@ -96,6 +94,7 @@ public class OutboxProducer
     }
     LOG.info("End-offsets: {}", endOffsets);
 
+    int deleted = 0;
     while(!Arrays.equals(currentOffsets, endOffsets))
     {
       ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
@@ -103,23 +102,25 @@ public class OutboxProducer
       records.forEach(record ->
       {
         long recordSequenceNumber = Longs.fromByteArray(record.headers().lastHeader(HEADER).value());
-        LOG.debug("Found watermark partition[{}]={}", record.partition(), recordSequenceNumber);
-        watermarks.set(record.partition(), recordSequenceNumber);
+        LOG.debug(
+            "Found message #{} on offset {} of partition {}",
+            recordSequenceNumber,
+            record.offset(),
+            record.partition());
+        send.add(recordSequenceNumber);
         currentOffsets[record.partition()] = record.offset();
       });
+      deleted += cleanUp();
       LOG.debug("Current offsets: {}", currentOffsets);
     }
 
-    LOG.info("Found watermarks: {}", watermarks);
-
-    sequenceNumber = watermarks.getLowest();
-    LOG.info("Restored sequence-number: {}", sequenceNumber);
+    LOG.info("Cleaned up {} already send entries from outbox table", deleted);
 
     consumer.close();
 
     this.clock = clock;
     this.cleanupInterval = properties.cleanupInterval;
-    this.nextCleanup = LocalTime.now(clock);
+    this.nextCleanup = LocalTime.now(clock).plus(this.cleanupInterval);
   }
 
   @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
@@ -134,17 +135,22 @@ public class OutboxProducer
         send(item);
       if (nextCleanup.isBefore(LocalTime.now(clock)))
       {
-        int deleted = repository.delete(watermarks.getLowest());
+        cleanUp();
         nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
-        LOG.info(
-            "Cleaned up {} entries from outbox, next clean-up: {}",
-            deleted,
-            nextCleanup);
+        LOG.debug("Next clean-up: {}", nextCleanup);
       }
     }
     while (items.size() > 0);
   }
 
+  int cleanUp()
+  {
+    int deleted = repository.delete(send);
+    LOG.debug("Cleaned up {}/{} entries from outbox, next clean-up: {}", deleted, send.size());
+    send.clear();
+    return deleted;
+  }
+
   void send(OutboxItem item)
   {
     final ProducerRecord<String, String> record =
@@ -157,7 +163,7 @@ public class OutboxProducer
     {
       if (metadata != null)
       {
-        watermarks.set(metadata.partition(), item.getSequenceNumber());
+        send.add(item.getSequenceNumber());
         LOG.info(
             "{}/{}:{} - {}:{}={}",
             metadata.topic(),