X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=delivery%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2Fdelivery%2FOutboxProducer.java;h=f0a226eb22629ff6772c16bec07c515c605b23c2;hb=313d30f59c98c36b7706a14c59417853c502b490;hp=2ad4b7e38acaf45cac25d712d9e2985223604074;hpb=0f0d035e62ec8035809108c2881065d24ca1b62d;p=demos%2Fkafka%2Foutbox diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java index 2ad4b7e..f0a226e 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java @@ -1,26 +1,32 @@ package de.juplo.kafka.outbox.delivery; import com.google.common.primitives.Longs; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import java.time.Clock; import java.time.Duration; -import java.util.List; -import java.util.Properties; +import java.time.LocalTime; +import java.util.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.*; import static org.apache.kafka.clients.producer.ProducerConfig.*; -@Component public class OutboxProducer { final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class); @@ -30,12 +36,17 @@ public class OutboxProducer private final OutboxRepository repository; private final KafkaProducer producer; private final String topic; + private final Set send = new HashSet<>(); + private final Clock clock; + private final Duration cleanupInterval; private long sequenceNumber = 0l; + private LocalTime nextCleanup; public OutboxProducer( ApplicationProperties properties, - OutboxRepository repository) + OutboxRepository repository, + Clock clock) { this.repository = repository; @@ -47,6 +58,78 @@ public class OutboxProducer this.producer = new KafkaProducer<>(props); this.topic = properties.topic; + + props = new Properties(); + props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); + props.put(GROUP_ID_CONFIG, "outbox"); + props.put(AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + List partitions = consumer.listTopics().get(this.topic); + Set assignment = new HashSet<>(); + for (PartitionInfo info : partitions) + { + LOG.debug("Found {}/{} (ISR: {})", info.topic(), info.partition(), info.inSyncReplicas()); + assignment.add(new TopicPartition(info.topic(), info.partition())); + } + + LOG.info("Using topic {} with {} partitions", topic, partitions); + + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.assign(assignment); + + long[] currentOffsets = new long[partitions.size()]; + for (Map.Entry entry : consumer.committed(assignment).entrySet()) + { + if (entry.getValue() == null) + { + LOG.debug("Found no offset for partition {}", entry.getKey()); + currentOffsets[entry.getKey().partition()] = -1l; + } + else + { + LOG.debug("Found current offset {} for partition {}", entry.getValue(), entry.getKey()); + currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l; + } + } + LOG.info("Current offsets: {}", currentOffsets); + + long[] endOffsets = new long[partitions.size()]; + for (Map.Entry entry : consumer.endOffsets(assignment).entrySet()) + { + LOG.debug("Found next offset {} for partition {}", entry.getValue(), entry.getKey()); + endOffsets[entry.getKey().partition()] = entry.getValue() - 1l; + } + LOG.info("End-offsets: {}", endOffsets); + + int deleted = 0; + while(!Arrays.equals(currentOffsets, endOffsets)) + { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + LOG.debug("Fetched {} records", records.count()); + records.forEach(record -> + { + long recordSequenceNumber = Longs.fromByteArray(record.headers().lastHeader(HEADER).value()); + LOG.debug( + "Found message #{} on offset {} of partition {}", + recordSequenceNumber, + record.offset(), + record.partition()); + send.add(recordSequenceNumber); + currentOffsets[record.partition()] = record.offset(); + }); + deleted += cleanUp(); + LOG.debug("Current offsets: {}", currentOffsets); + } + + LOG.info("Cleaned up {} already send entries from outbox table", deleted); + + consumer.close(); + + this.clock = clock; + this.cleanupInterval = properties.cleanupInterval; + this.nextCleanup = LocalTime.now(clock).plus(this.cleanupInterval); } @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}") @@ -59,10 +142,24 @@ public class OutboxProducer LOG.debug("Polled {} new items", items.size()); for (OutboxItem item : items) send(item); + if (nextCleanup.isBefore(LocalTime.now(clock))) + { + cleanUp(); + nextCleanup = LocalTime.now(clock).plus(cleanupInterval); + LOG.debug("Next clean-up: {}", nextCleanup); + } } while (items.size() > 0); } + int cleanUp() + { + int deleted = repository.delete(send); + LOG.debug("Cleaned up {}/{} entries from outbox", deleted, send.size()); + send.clear(); + return deleted; + } + void send(OutboxItem item) { final ProducerRecord record = @@ -75,16 +172,15 @@ public class OutboxProducer { if (metadata != null) { - int deleted = repository.delete(item.getSequenceNumber()); + send.add(item.getSequenceNumber()); LOG.info( - "{}/{}:{} - {}:{}={} - deleted: {}", + "{}/{}:{} - {}:{}={}", metadata.topic(), metadata.partition(), metadata.offset(), item.getSequenceNumber(), record.key(), - record.value(), - deleted); + record.value()); } else {