X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Foutbox;a=blobdiff_plain;f=delivery%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2Fdelivery%2FOutboxProducer.java;fp=delivery%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2Fdelivery%2FOutboxProducer.java;h=1327113c531664aadb9b32f49d216dc879d93bb7;hp=f7b0c7f4a356e638b260e4ac18220d789b051b82;hb=b6d734cd09d3e23171eaf6235d19e73bc11ab420;hpb=3f41296dae5c094a29f8a89cda2bccfb8bc93c0a diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java index f7b0c7f..1327113 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java @@ -36,7 +36,7 @@ public class OutboxProducer private final OutboxRepository repository; private final KafkaProducer producer; private final String topic; - private final Watermarks watermarks; + private final Set send = new HashSet<>(); private final Clock clock; private final Duration cleanupInterval; @@ -78,8 +78,6 @@ public class OutboxProducer KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.assign(assignment); - this.watermarks = new Watermarks(partitions.size()); - long[] currentOffsets = new long[partitions.size()]; for (Map.Entry entry : consumer.committed(assignment).entrySet()) { @@ -96,6 +94,7 @@ public class OutboxProducer } LOG.info("End-offsets: {}", endOffsets); + int deleted = 0; while(!Arrays.equals(currentOffsets, endOffsets)) { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); @@ -103,23 +102,25 @@ public class OutboxProducer records.forEach(record -> { long recordSequenceNumber = Longs.fromByteArray(record.headers().lastHeader(HEADER).value()); - LOG.debug("Found watermark partition[{}]={}", record.partition(), recordSequenceNumber); - watermarks.set(record.partition(), recordSequenceNumber); + LOG.debug( + "Found message #{} on offset {} of partition {}", + recordSequenceNumber, + record.offset(), + record.partition()); + send.add(recordSequenceNumber); currentOffsets[record.partition()] = record.offset(); }); + deleted += cleanUp(); LOG.debug("Current offsets: {}", currentOffsets); } - LOG.info("Found watermarks: {}", watermarks); - - sequenceNumber = watermarks.getLowest(); - LOG.info("Restored sequence-number: {}", sequenceNumber); + LOG.info("Cleaned up {} already send entries from outbox table", deleted); consumer.close(); this.clock = clock; this.cleanupInterval = properties.cleanupInterval; - this.nextCleanup = LocalTime.now(clock); + this.nextCleanup = LocalTime.now(clock).plus(this.cleanupInterval); } @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}") @@ -134,17 +135,22 @@ public class OutboxProducer send(item); if (nextCleanup.isBefore(LocalTime.now(clock))) { - int deleted = repository.delete(watermarks.getLowest()); + cleanUp(); nextCleanup = LocalTime.now(clock).plus(cleanupInterval); - LOG.info( - "Cleaned up {} entries from outbox, next clean-up: {}", - deleted, - nextCleanup); + LOG.debug("Next clean-up: {}", nextCleanup); } } while (items.size() > 0); } + int cleanUp() + { + int deleted = repository.delete(send); + LOG.debug("Cleaned up {}/{} entries from outbox, next clean-up: {}", deleted, send.size()); + send.clear(); + return deleted; + } + void send(OutboxItem item) { final ProducerRecord record = @@ -157,7 +163,7 @@ public class OutboxProducer { if (metadata != null) { - watermarks.set(metadata.partition(), item.getSequenceNumber()); + send.add(item.getSequenceNumber()); LOG.info( "{}/{}:{} - {}:{}={}", metadata.topic(),