package de.juplo.kafka.outbox.delivery;
import com.google.common.primitives.Longs;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Clock;
import java.time.Duration;
import java.time.LocalTime;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.PreDestroy;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
import static org.apache.kafka.clients.producer.ProducerConfig.*;
private final OutboxRepository repository;
private final KafkaProducer<String, String> producer;
private final String topic;
- private final Watermarks watermarks;
+ private final Set<Long> send = new HashSet<>();
private final Clock clock;
private final Duration cleanupInterval;
this.producer = new KafkaProducer<>(props);
this.topic = properties.topic;
- this.watermarks = new Watermarks();
+ props = new Properties();
+ props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
+ props.put(GROUP_ID_CONFIG, "outbox");
+ props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(Arrays.asList(this.topic));
+ List<PartitionInfo> partitions = consumer.listTopics().get(this.topic);
+ Set<TopicPartition> assignment = new HashSet<>();
+ for (PartitionInfo info : partitions)
+ {
+ LOG.debug("Found {}/{} (ISR: {})", info.topic(), info.partition(), info.inSyncReplicas());
+ assignment.add(new TopicPartition(info.topic(), info.partition()));
+ }
+
+ LOG.info("Using topic {} with {} partitions", topic, partitions);
+
+ long[] currentOffsets = new long[partitions.size()];
+ for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumer.committed(assignment).entrySet())
+ {
+ LOG.info("Found current offset {} for partition {}", entry.getValue(), entry.getKey());
+ currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l;
+ }
+ LOG.info("Current offsets: {}", currentOffsets);
+
+ long[] endOffsets = new long[partitions.size()];
+ for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(assignment).entrySet())
+ {
+ LOG.info("Found next offset {} for partition {}", entry.getValue(), entry.getKey());
+ endOffsets[entry.getKey().partition()] = entry.getValue() - 1l;
+ }
+ LOG.info("End-offsets: {}", endOffsets);
+
+ int deleted = 0;
+ while(!Arrays.equals(currentOffsets, endOffsets))
+ {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
+ LOG.debug("Fetched {} records", records.count());
+ records.forEach(record ->
+ {
+ long recordSequenceNumber = Longs.fromByteArray(record.headers().lastHeader(HEADER).value());
+ LOG.debug(
+ "Found message #{} on offset {} of partition {}",
+ recordSequenceNumber,
+ record.offset(),
+ record.partition());
+ send.add(recordSequenceNumber);
+ currentOffsets[record.partition()] = record.offset();
+ });
+ deleted += cleanUp();
+ LOG.debug("Current offsets: {}", currentOffsets);
+ }
+
+ LOG.info("Cleaned up {} already send entries from outbox table", deleted);
+
+ consumer.close();
+
this.clock = clock;
this.cleanupInterval = properties.cleanupInterval;
- this.nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
+ this.nextCleanup = LocalTime.now(clock).plus(this.cleanupInterval);
}
@Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
send(item);
if (nextCleanup.isBefore(LocalTime.now(clock)))
{
- int deleted = repository.delete(watermarks.getLowest());
+ cleanUp();
nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
- LOG.info(
- "Cleaned up {} entries from outbox, next clean-up: {}",
- deleted,
- nextCleanup);
+ LOG.debug("Next clean-up: {}", nextCleanup);
}
}
while (items.size() > 0);
}
+ int cleanUp()
+ {
+ int deleted = repository.delete(send);
+ LOG.debug("Cleaned up {}/{} entries from outbox, next clean-up: {}", deleted, send.size());
+ send.clear();
+ return deleted;
+ }
+
void send(OutboxItem item)
{
final ProducerRecord<String, String> record =
{
if (metadata != null)
{
- watermarks.set(metadata.partition(), item.getSequenceNumber());
+ send.add(item.getSequenceNumber());
LOG.info(
"{}/{}:{} - {}:{}={}",
metadata.topic(),