From a92d318043bf698dc2a949db2b76893c8abf03a1 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 30 Jan 2021 22:39:41 +0100 Subject: [PATCH] The OutboxProducer restores the sequence-number from the wirtten topic * The OutboxProducer consumes the written messages on start-up, extracts the send sequence-numbers and recalculates the seen watermarks and the maximal sequence-number, for at-least-once semantics. * This implementation will most probably resend some messages after a crash, but it will never skip any unsend messages. * Messages may be resend despite the recalculated watermarks, because only messages with a sequence number lower than the lowest watermark can be discarded safely from the outbox, if message-loss is not acceptable. * Deactivated the integration-test for the loading of the context, because it cannot work without an available Kafka-Cluster. --- .../kafka/outbox/delivery/OutboxProducer.java | 70 +++++++++++++++++-- .../kafka/outbox/delivery/Watermarks.java | 32 ++++++--- .../outbox/delivery/ApplicationTests.java | 16 ----- 3 files changed, 89 insertions(+), 29 deletions(-) delete mode 100644 delivery/src/test/java/de/juplo/kafka/outbox/delivery/ApplicationTests.java diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java index 29d827a..331374d 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java @@ -1,13 +1,18 @@ package de.juplo.kafka.outbox.delivery; import com.google.common.primitives.Longs; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import java.time.Clock; import java.time.Duration; import java.time.LocalTime; -import java.util.List; -import java.util.Properties; +import java.util.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -18,6 +23,7 @@ import org.springframework.scheduling.annotation.Scheduled; import javax.annotation.PreDestroy; import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.*; import static org.apache.kafka.clients.producer.ProducerConfig.*; @@ -53,10 +59,66 @@ public class OutboxProducer this.producer = new KafkaProducer<>(props); this.topic = properties.topic; - this.watermarks = new Watermarks(); + props = new Properties(); + props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); + props.put(GROUP_ID_CONFIG, "outbox"); + props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Arrays.asList(this.topic)); + List partitions = consumer.listTopics().get(this.topic); + Set assignment = new HashSet<>(); + for (PartitionInfo info : partitions) + { + LOG.debug("Found {}/{} (ISR: {})", info.topic(), info.partition(), info.inSyncReplicas()); + assignment.add(new TopicPartition(info.topic(), info.partition())); + } + + LOG.info("Using topic {} with {} partitions", topic, partitions); + + this.watermarks = new Watermarks(partitions.size()); + + long[] currentOffsets = new long[partitions.size()]; + for (Map.Entry entry : consumer.committed(assignment).entrySet()) + { + LOG.info("Found current offset {} for partition {}", entry.getValue(), entry.getKey()); + currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l; + } + LOG.info("Current offsets: {}", currentOffsets); + + long[] endOffsets = new long[partitions.size()]; + for (Map.Entry entry : consumer.endOffsets(assignment).entrySet()) + { + LOG.info("Found next offset {} for partition {}", entry.getValue(), entry.getKey()); + endOffsets[entry.getKey().partition()] = entry.getValue() - 1l; + } + LOG.info("End-offsets: {}", endOffsets); + + while(!Arrays.equals(currentOffsets, endOffsets)) + { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + LOG.debug("Fetched {} records", records.count()); + records.forEach(record -> + { + long recordSequenceNumber = Longs.fromByteArray(record.headers().lastHeader(HEADER).value()); + LOG.debug("Found watermark partition[{}]={}", record.partition(), recordSequenceNumber); + watermarks.set(record.partition(), recordSequenceNumber); + currentOffsets[record.partition()] = record.offset(); + }); + LOG.debug("Current offsets: {}", currentOffsets); + } + + LOG.info("Found watermarks: {}", watermarks); + + sequenceNumber = watermarks.getLowest(); + LOG.info("Restored sequence-number: {}", sequenceNumber); + + consumer.close(); + this.clock = clock; this.cleanupInterval = properties.cleanupInterval; - this.nextCleanup = LocalTime.now(clock).plus(cleanupInterval); + this.nextCleanup = LocalTime.now(clock); } @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}") diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java index 8f071f9..4bd0c9e 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Watermarks.java @@ -3,19 +3,17 @@ package de.juplo.kafka.outbox.delivery; public class Watermarks { - private long[] watermarks = new long[0]; + private final long[] watermarks; - public synchronized void set(int partition, long watermark) + public Watermarks(int partitions) { - if (partition >= watermarks.length) - { - long[] resized = new long[partition + 1]; - for (int i = 0; i < watermarks.length; i++) - resized[i] = watermarks[i]; - watermarks = resized; - } + watermarks = new long[partitions]; + } + + public synchronized void set(int partition, long watermark) + { watermarks[partition] = watermark; } @@ -29,4 +27,20 @@ public class Watermarks return lowest; } + + @Override + public String toString() + { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < watermarks.length; i++) + { + builder.append("partition["); + builder.append(i); + builder.append("]="); + builder.append(watermarks[i]); + if (i != watermarks.length - 1) + builder.append(", "); + } + return builder.toString(); + } } diff --git a/delivery/src/test/java/de/juplo/kafka/outbox/delivery/ApplicationTests.java b/delivery/src/test/java/de/juplo/kafka/outbox/delivery/ApplicationTests.java deleted file mode 100644 index b8f1834..0000000 --- a/delivery/src/test/java/de/juplo/kafka/outbox/delivery/ApplicationTests.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.juplo.kafka.outbox.delivery; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.junit4.SpringRunner; - -@RunWith(SpringRunner.class) -@SpringBootTest -public class ApplicationTests -{ - @Test - public void contextLoads() - { - } -} -- 2.20.1