Sent messages are deleted individually after a crash
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / OutboxProducer.java
index 09c8789..1327113 100644 (file)
@@ -1,40 +1,52 @@
 package de.juplo.kafka.outbox.delivery;
 
 import com.google.common.primitives.Longs;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
+import java.time.Clock;
 import java.time.Duration;
-import java.util.List;
-import java.util.Properties;
+import java.time.LocalTime;
+import java.util.*;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
 
 import javax.annotation.PreDestroy;
 
 import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
 import static org.apache.kafka.clients.producer.ProducerConfig.*;
 
 
-@Component
 public class OutboxProducer
 {
   final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
 
+  public final static String HEADER = "#";
 
   private final OutboxRepository repository;
   private final KafkaProducer<String, String> producer;
   private final String topic;
+  private final Set<Long> send = new HashSet<>();
+  private final Clock clock;
+  private final Duration cleanupInterval;
 
   private long sequenceNumber = 0l;
+  private LocalTime nextCleanup;
 
   public OutboxProducer(
       ApplicationProperties properties,
-      OutboxRepository repository)
+      OutboxRepository repository,
+      Clock clock)
   {
     this.repository = repository;
 
@@ -46,6 +58,69 @@ public class OutboxProducer
 
     this.producer = new KafkaProducer<>(props);
     this.topic = properties.topic;
+
+    props = new Properties();
+    props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
+    props.put(GROUP_ID_CONFIG, "outbox");
+    props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+    props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+    List<PartitionInfo> partitions = consumer.listTopics().get(this.topic);
+    Set<TopicPartition> assignment = new HashSet<>();
+    for (PartitionInfo info : partitions)
+    {
+      LOG.debug("Found {}/{} (ISR: {})", info.topic(), info.partition(), info.inSyncReplicas());
+      assignment.add(new TopicPartition(info.topic(), info.partition()));
+    }
+
+    LOG.info("Using topic {} with {} partitions", topic, partitions);
+
+    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+    consumer.assign(assignment);
+
+    long[] currentOffsets = new long[partitions.size()];
+    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumer.committed(assignment).entrySet())
+    {
+      LOG.info("Found current offset {} for partition {}", entry.getValue(), entry.getKey());
+      currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l;
+    }
+    LOG.info("Current offsets: {}", currentOffsets);
+
+    long[] endOffsets = new long[partitions.size()];
+    for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(assignment).entrySet())
+    {
+      LOG.info("Found next offset {} for partition {}", entry.getValue(), entry.getKey());
+      endOffsets[entry.getKey().partition()] = entry.getValue() - 1l;
+    }
+    LOG.info("End-offsets: {}", endOffsets);
+
+    int deleted = 0;
+    while(!Arrays.equals(currentOffsets, endOffsets))
+    {
+      ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
+      LOG.debug("Fetched {} records", records.count());
+      records.forEach(record ->
+      {
+        long recordSequenceNumber = Longs.fromByteArray(record.headers().lastHeader(HEADER).value());
+        LOG.debug(
+            "Found message #{} on offset {} of partition {}",
+            recordSequenceNumber,
+            record.offset(),
+            record.partition());
+        send.add(recordSequenceNumber);
+        currentOffsets[record.partition()] = record.offset();
+      });
+      deleted += cleanUp();
+      LOG.debug("Current offsets: {}", currentOffsets);
+    }
+
+    LOG.info("Cleaned up {} already send entries from outbox table", deleted);
+
+    consumer.close();
+
+    this.clock = clock;
+    this.cleanupInterval = properties.cleanupInterval;
+    this.nextCleanup = LocalTime.now(clock).plus(this.cleanupInterval);
   }
 
   @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
@@ -58,32 +133,45 @@ public class OutboxProducer
       LOG.debug("Polled {} new items", items.size());
       for (OutboxItem item : items)
         send(item);
+      if (nextCleanup.isBefore(LocalTime.now(clock)))
+      {
+        cleanUp();
+        nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
+        LOG.debug("Next clean-up: {}", nextCleanup);
+      }
     }
     while (items.size() > 0);
   }
 
+  int cleanUp()
+  {
+    int deleted = repository.delete(send);
+    LOG.debug("Cleaned up {}/{} entries from outbox, next clean-up: {}", deleted, send.size());
+    send.clear();
+    return deleted;
+  }
+
   void send(OutboxItem item)
   {
     final ProducerRecord<String, String> record =
         new ProducerRecord<>(topic, item.getKey(), item.getValue());
 
     sequenceNumber = item.getSequenceNumber();
-    record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber));
+    record.headers().add(HEADER, Longs.toByteArray(sequenceNumber));
 
     producer.send(record, (metadata, e) ->
     {
       if (metadata != null)
       {
-        int deleted = repository.delete(item.getSequenceNumber());
+        send.add(item.getSequenceNumber());
         LOG.info(
-            "{}/{}:{} - {}:{}={} - deleted: {}",
+            "{}/{}:{} - {}:{}={}",
             metadata.topic(),
             metadata.partition(),
             metadata.offset(),
             item.getSequenceNumber(),
             record.key(),
-            record.value(),
-            deleted);
+            record.value());
       }
       else
       {