The OutboxProducer restores the sequence-number from the wirtten topic
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / OutboxProducer.java
index 29d827a..331374d 100644 (file)
@@ -1,13 +1,18 @@
 package de.juplo.kafka.outbox.delivery;
 
 import com.google.common.primitives.Longs;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.time.Clock;
 import java.time.Duration;
 import java.time.LocalTime;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -18,6 +23,7 @@ import org.springframework.scheduling.annotation.Scheduled;
 import javax.annotation.PreDestroy;
 
 import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
 import static org.apache.kafka.clients.producer.ProducerConfig.*;
 
 
@@ -53,10 +59,66 @@ public class OutboxProducer
     this.producer = new KafkaProducer<>(props);
     this.topic = properties.topic;
 
-    this.watermarks = new Watermarks();
+    props = new Properties();
+    props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
+    props.put(GROUP_ID_CONFIG, "outbox");
+    props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+    props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+    consumer.subscribe(Arrays.asList(this.topic));
+    List<PartitionInfo> partitions = consumer.listTopics().get(this.topic);
+    Set<TopicPartition> assignment = new HashSet<>();
+    for (PartitionInfo info : partitions)
+    {
+      LOG.debug("Found {}/{} (ISR: {})", info.topic(), info.partition(), info.inSyncReplicas());
+      assignment.add(new TopicPartition(info.topic(), info.partition()));
+    }
+
+    LOG.info("Using topic {} with {} partitions", topic, partitions);
+
+    this.watermarks = new Watermarks(partitions.size());
+
+    long[] currentOffsets = new long[partitions.size()];
+    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumer.committed(assignment).entrySet())
+    {
+      LOG.info("Found current offset {} for partition {}", entry.getValue(), entry.getKey());
+      currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l;
+    }
+    LOG.info("Current offsets: {}", currentOffsets);
+
+    long[] endOffsets = new long[partitions.size()];
+    for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(assignment).entrySet())
+    {
+      LOG.info("Found next offset {} for partition {}", entry.getValue(), entry.getKey());
+      endOffsets[entry.getKey().partition()] = entry.getValue() - 1l;
+    }
+    LOG.info("End-offsets: {}", endOffsets);
+
+    while(!Arrays.equals(currentOffsets, endOffsets))
+    {
+      ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
+      LOG.debug("Fetched {} records", records.count());
+      records.forEach(record ->
+      {
+        long recordSequenceNumber = Longs.fromByteArray(record.headers().lastHeader(HEADER).value());
+        LOG.debug("Found watermark partition[{}]={}", record.partition(), recordSequenceNumber);
+        watermarks.set(record.partition(), recordSequenceNumber);
+        currentOffsets[record.partition()] = record.offset();
+      });
+      LOG.debug("Current offsets: {}", currentOffsets);
+    }
+
+    LOG.info("Found watermarks: {}", watermarks);
+
+    sequenceNumber = watermarks.getLowest();
+    LOG.info("Restored sequence-number: {}", sequenceNumber);
+
+    consumer.close();
+
     this.clock = clock;
     this.cleanupInterval = properties.cleanupInterval;
-    this.nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
+    this.nextCleanup = LocalTime.now(clock);
   }
 
   @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")