The OutboxProducer restores the sequence-number from the wirtten topic
authorKai Moritz <kai@juplo.de>
Sat, 30 Jan 2021 21:39:41 +0000 (22:39 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 16 May 2021 21:53:16 +0000 (23:53 +0200)
* The OutboxProducer consumes the written messages on start-up, extracts the
  send sequence-numbers and recalculates the seen watermarks and the maximal
  sequence-number, for at-least-once semantics.
* This implementation will most probably resend some messages after a crash,
  but it will never skip any unsend messages.
* Messages may be resend despite the recalculated watermarks, because only
  messages with a sequence number lower than the lowest watermark can be
  discarded safely from the outbox, if message-loss is not acceptable.
* Deactivated the integration-test for the loading of the context, because
  it cannot work without an available Kafka-Cluster.

delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java
delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java
delivery/src/test/java/de/juplo/kafka/outbox/delivery/ApplicationTests.java [deleted file]

index 29d827a..331374d 100644 (file)
@@ -1,13 +1,18 @@
 package de.juplo.kafka.outbox.delivery;
 
 import com.google.common.primitives.Longs;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.time.Clock;
 import java.time.Duration;
 import java.time.LocalTime;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -18,6 +23,7 @@ import org.springframework.scheduling.annotation.Scheduled;
 import javax.annotation.PreDestroy;
 
 import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
 import static org.apache.kafka.clients.producer.ProducerConfig.*;
 
 
@@ -53,10 +59,66 @@ public class OutboxProducer
     this.producer = new KafkaProducer<>(props);
     this.topic = properties.topic;
 
-    this.watermarks = new Watermarks();
+    props = new Properties();
+    props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
+    props.put(GROUP_ID_CONFIG, "outbox");
+    props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+    props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+    consumer.subscribe(Arrays.asList(this.topic));
+    List<PartitionInfo> partitions = consumer.listTopics().get(this.topic);
+    Set<TopicPartition> assignment = new HashSet<>();
+    for (PartitionInfo info : partitions)
+    {
+      LOG.debug("Found {}/{} (ISR: {})", info.topic(), info.partition(), info.inSyncReplicas());
+      assignment.add(new TopicPartition(info.topic(), info.partition()));
+    }
+
+    LOG.info("Using topic {} with {} partitions", topic, partitions);
+
+    this.watermarks = new Watermarks(partitions.size());
+
+    long[] currentOffsets = new long[partitions.size()];
+    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumer.committed(assignment).entrySet())
+    {
+      LOG.info("Found current offset {} for partition {}", entry.getValue(), entry.getKey());
+      currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l;
+    }
+    LOG.info("Current offsets: {}", currentOffsets);
+
+    long[] endOffsets = new long[partitions.size()];
+    for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(assignment).entrySet())
+    {
+      LOG.info("Found next offset {} for partition {}", entry.getValue(), entry.getKey());
+      endOffsets[entry.getKey().partition()] = entry.getValue() - 1l;
+    }
+    LOG.info("End-offsets: {}", endOffsets);
+
+    while(!Arrays.equals(currentOffsets, endOffsets))
+    {
+      ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
+      LOG.debug("Fetched {} records", records.count());
+      records.forEach(record ->
+      {
+        long recordSequenceNumber = Longs.fromByteArray(record.headers().lastHeader(HEADER).value());
+        LOG.debug("Found watermark partition[{}]={}", record.partition(), recordSequenceNumber);
+        watermarks.set(record.partition(), recordSequenceNumber);
+        currentOffsets[record.partition()] = record.offset();
+      });
+      LOG.debug("Current offsets: {}", currentOffsets);
+    }
+
+    LOG.info("Found watermarks: {}", watermarks);
+
+    sequenceNumber = watermarks.getLowest();
+    LOG.info("Restored sequence-number: {}", sequenceNumber);
+
+    consumer.close();
+
     this.clock = clock;
     this.cleanupInterval = properties.cleanupInterval;
-    this.nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
+    this.nextCleanup = LocalTime.now(clock);
   }
 
   @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
index 8f071f9..4bd0c9e 100644 (file)
@@ -3,19 +3,17 @@ package de.juplo.kafka.outbox.delivery;
 
 public class Watermarks
 {
-  private long[] watermarks = new long[0];
+  private final long[] watermarks;
 
 
-  public synchronized void set(int partition, long watermark)
+  public Watermarks(int partitions)
   {
-    if (partition >= watermarks.length)
-    {
-      long[] resized = new long[partition + 1];
-      for (int i = 0; i < watermarks.length; i++)
-        resized[i] = watermarks[i];
-      watermarks = resized;
-    }
+    watermarks = new long[partitions];
+  }
 
+
+  public synchronized void set(int partition, long watermark)
+  {
     watermarks[partition] = watermark;
   }
 
@@ -29,4 +27,20 @@ public class Watermarks
 
     return lowest;
   }
+
+  @Override
+  public String toString()
+  {
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < watermarks.length; i++)
+    {
+      builder.append("partition[");
+      builder.append(i);
+      builder.append("]=");
+      builder.append(watermarks[i]);
+      if (i != watermarks.length - 1)
+        builder.append(", ");
+    }
+    return builder.toString();
+  }
 }
diff --git a/delivery/src/test/java/de/juplo/kafka/outbox/delivery/ApplicationTests.java b/delivery/src/test/java/de/juplo/kafka/outbox/delivery/ApplicationTests.java
deleted file mode 100644 (file)
index b8f1834..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-package de.juplo.kafka.outbox.delivery;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
-
-@RunWith(SpringRunner.class)
-@SpringBootTest
-public class ApplicationTests
-{
-  @Test
-  public void contextLoads()
-  {
-  }
-}