Fixed bug on first start-up (no current offsets)
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / OutboxProducer.java
index 331374d..f0a226e 100644 (file)
@@ -36,7 +36,7 @@ public class OutboxProducer
   private final OutboxRepository repository;
   private final KafkaProducer<String, String> producer;
   private final String topic;
-  private final Watermarks watermarks;
+  private final Set<Long> send = new HashSet<>();
   private final Clock clock;
   private final Duration cleanupInterval;
 
@@ -62,11 +62,10 @@ public class OutboxProducer
     props = new Properties();
     props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
     props.put(GROUP_ID_CONFIG, "outbox");
+    props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
     props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
-    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
-    consumer.subscribe(Arrays.asList(this.topic));
     List<PartitionInfo> partitions = consumer.listTopics().get(this.topic);
     Set<TopicPartition> assignment = new HashSet<>();
     for (PartitionInfo info : partitions)
@@ -77,24 +76,34 @@ public class OutboxProducer
 
     LOG.info("Using topic {} with {} partitions", topic, partitions);
 
-    this.watermarks = new Watermarks(partitions.size());
+    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+    consumer.assign(assignment);
 
     long[] currentOffsets = new long[partitions.size()];
     for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumer.committed(assignment).entrySet())
     {
-      LOG.info("Found current offset {} for partition {}", entry.getValue(), entry.getKey());
-      currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l;
+      if (entry.getValue() == null)
+      {
+        LOG.debug("Found no offset for partition {}", entry.getKey());
+        currentOffsets[entry.getKey().partition()] = -1l;
+      }
+      else
+      {
+        LOG.debug("Found current offset {} for partition {}", entry.getValue(), entry.getKey());
+        currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l;
+      }
     }
     LOG.info("Current offsets: {}", currentOffsets);
 
     long[] endOffsets = new long[partitions.size()];
     for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(assignment).entrySet())
     {
-      LOG.info("Found next offset {} for partition {}", entry.getValue(), entry.getKey());
+      LOG.debug("Found next offset {} for partition {}", entry.getValue(), entry.getKey());
       endOffsets[entry.getKey().partition()] = entry.getValue() - 1l;
     }
     LOG.info("End-offsets: {}", endOffsets);
 
+    int deleted = 0;
     while(!Arrays.equals(currentOffsets, endOffsets))
     {
       ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
@@ -102,23 +111,25 @@ public class OutboxProducer
       records.forEach(record ->
       {
         long recordSequenceNumber = Longs.fromByteArray(record.headers().lastHeader(HEADER).value());
-        LOG.debug("Found watermark partition[{}]={}", record.partition(), recordSequenceNumber);
-        watermarks.set(record.partition(), recordSequenceNumber);
+        LOG.debug(
+            "Found message #{} on offset {} of partition {}",
+            recordSequenceNumber,
+            record.offset(),
+            record.partition());
+        send.add(recordSequenceNumber);
         currentOffsets[record.partition()] = record.offset();
       });
+      deleted += cleanUp();
       LOG.debug("Current offsets: {}", currentOffsets);
     }
 
-    LOG.info("Found watermarks: {}", watermarks);
-
-    sequenceNumber = watermarks.getLowest();
-    LOG.info("Restored sequence-number: {}", sequenceNumber);
+    LOG.info("Cleaned up {} already send entries from outbox table", deleted);
 
     consumer.close();
 
     this.clock = clock;
     this.cleanupInterval = properties.cleanupInterval;
-    this.nextCleanup = LocalTime.now(clock);
+    this.nextCleanup = LocalTime.now(clock).plus(this.cleanupInterval);
   }
 
   @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
@@ -133,17 +144,22 @@ public class OutboxProducer
         send(item);
       if (nextCleanup.isBefore(LocalTime.now(clock)))
       {
-        int deleted = repository.delete(watermarks.getLowest());
+        cleanUp();
         nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
-        LOG.info(
-            "Cleaned up {} entries from outbox, next clean-up: {}",
-            deleted,
-            nextCleanup);
+        LOG.debug("Next clean-up: {}", nextCleanup);
       }
     }
     while (items.size() > 0);
   }
 
+  int cleanUp()
+  {
+    int deleted = repository.delete(send);
+    LOG.debug("Cleaned up {}/{} entries from outbox", deleted, send.size());
+    send.clear();
+    return deleted;
+  }
+
   void send(OutboxItem item)
   {
     final ProducerRecord<String, String> record =
@@ -156,7 +172,7 @@ public class OutboxProducer
     {
       if (metadata != null)
       {
-        watermarks.set(metadata.partition(), item.getSequenceNumber());
+        send.add(item.getSequenceNumber());
         LOG.info(
             "{}/{}:{} - {}:{}={}",
             metadata.topic(),