X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=delivery%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2Fdelivery%2FOutboxProducer.java;h=f0a226eb22629ff6772c16bec07c515c605b23c2;hb=313d30f59c98c36b7706a14c59417853c502b490;hp=331374df1d71727344107a4245f2472c4a6b7400;hpb=a92d318043bf698dc2a949db2b76893c8abf03a1;p=demos%2Fkafka%2Foutbox diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java index 331374d..f0a226e 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java @@ -36,7 +36,7 @@ public class OutboxProducer private final OutboxRepository repository; private final KafkaProducer producer; private final String topic; - private final Watermarks watermarks; + private final Set send = new HashSet<>(); private final Clock clock; private final Duration cleanupInterval; @@ -62,11 +62,10 @@ public class OutboxProducer props = new Properties(); props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); props.put(GROUP_ID_CONFIG, "outbox"); + props.put(AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - KafkaConsumer consumer = new KafkaConsumer<>(props); - consumer.subscribe(Arrays.asList(this.topic)); List partitions = consumer.listTopics().get(this.topic); Set assignment = new HashSet<>(); for (PartitionInfo info : partitions) @@ -77,24 +76,34 @@ public class OutboxProducer LOG.info("Using topic {} with {} partitions", topic, partitions); - this.watermarks = new Watermarks(partitions.size()); + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.assign(assignment); long[] currentOffsets = new long[partitions.size()]; for (Map.Entry entry : consumer.committed(assignment).entrySet()) { - LOG.info("Found current offset {} for partition {}", entry.getValue(), entry.getKey()); - currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l; + if (entry.getValue() == null) + { + LOG.debug("Found no offset for partition {}", entry.getKey()); + currentOffsets[entry.getKey().partition()] = -1l; + } + else + { + LOG.debug("Found current offset {} for partition {}", entry.getValue(), entry.getKey()); + currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l; + } } LOG.info("Current offsets: {}", currentOffsets); long[] endOffsets = new long[partitions.size()]; for (Map.Entry entry : consumer.endOffsets(assignment).entrySet()) { - LOG.info("Found next offset {} for partition {}", entry.getValue(), entry.getKey()); + LOG.debug("Found next offset {} for partition {}", entry.getValue(), entry.getKey()); endOffsets[entry.getKey().partition()] = entry.getValue() - 1l; } LOG.info("End-offsets: {}", endOffsets); + int deleted = 0; while(!Arrays.equals(currentOffsets, endOffsets)) { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); @@ -102,23 +111,25 @@ public class OutboxProducer records.forEach(record -> { long recordSequenceNumber = Longs.fromByteArray(record.headers().lastHeader(HEADER).value()); - LOG.debug("Found watermark partition[{}]={}", record.partition(), recordSequenceNumber); - watermarks.set(record.partition(), recordSequenceNumber); + LOG.debug( + "Found message #{} on offset {} of partition {}", + recordSequenceNumber, + record.offset(), + record.partition()); + send.add(recordSequenceNumber); currentOffsets[record.partition()] = record.offset(); }); + deleted += cleanUp(); LOG.debug("Current offsets: {}", currentOffsets); } - LOG.info("Found watermarks: {}", watermarks); - - sequenceNumber = watermarks.getLowest(); - LOG.info("Restored sequence-number: {}", sequenceNumber); + LOG.info("Cleaned up {} already send entries from outbox table", deleted); consumer.close(); this.clock = clock; this.cleanupInterval = properties.cleanupInterval; - this.nextCleanup = LocalTime.now(clock); + this.nextCleanup = LocalTime.now(clock).plus(this.cleanupInterval); } @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}") @@ -133,17 +144,22 @@ public class OutboxProducer send(item); if (nextCleanup.isBefore(LocalTime.now(clock))) { - int deleted = repository.delete(watermarks.getLowest()); + cleanUp(); nextCleanup = LocalTime.now(clock).plus(cleanupInterval); - LOG.info( - "Cleaned up {} entries from outbox, next clean-up: {}", - deleted, - nextCleanup); + LOG.debug("Next clean-up: {}", nextCleanup); } } while (items.size() > 0); } + int cleanUp() + { + int deleted = repository.delete(send); + LOG.debug("Cleaned up {}/{} entries from outbox", deleted, send.size()); + send.clear(); + return deleted; + } + void send(OutboxItem item) { final ProducerRecord record = @@ -156,7 +172,7 @@ public class OutboxProducer { if (metadata != null) { - watermarks.set(metadata.partition(), item.getSequenceNumber()); + send.add(item.getSequenceNumber()); LOG.info( "{}/{}:{} - {}:{}={}", metadata.topic(),