private final OutboxRepository repository;
private final KafkaProducer<String, String> producer;
private final String topic;
- private final Watermarks watermarks;
+ private final Set<Long> send = new HashSet<>();
private final Clock clock;
private final Duration cleanupInterval;
props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
props.put(GROUP_ID_CONFIG, "outbox");
+ props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList(this.topic));
List<PartitionInfo> partitions = consumer.listTopics().get(this.topic);
Set<TopicPartition> assignment = new HashSet<>();
for (PartitionInfo info : partitions)
LOG.info("Using topic {} with {} partitions", topic, partitions);
- this.watermarks = new Watermarks(partitions.size());
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+ consumer.assign(assignment);
long[] currentOffsets = new long[partitions.size()];
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumer.committed(assignment).entrySet())
{
- LOG.info("Found current offset {} for partition {}", entry.getValue(), entry.getKey());
- currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l;
+ if (entry.getValue() == null)
+ {
+ LOG.debug("Found no offset for partition {}", entry.getKey());
+ currentOffsets[entry.getKey().partition()] = -1l;
+ }
+ else
+ {
+ LOG.debug("Found current offset {} for partition {}", entry.getValue(), entry.getKey());
+ currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l;
+ }
}
LOG.info("Current offsets: {}", currentOffsets);
long[] endOffsets = new long[partitions.size()];
for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(assignment).entrySet())
{
- LOG.info("Found next offset {} for partition {}", entry.getValue(), entry.getKey());
+ LOG.debug("Found next offset {} for partition {}", entry.getValue(), entry.getKey());
endOffsets[entry.getKey().partition()] = entry.getValue() - 1l;
}
LOG.info("End-offsets: {}", endOffsets);
+ int deleted = 0;
while(!Arrays.equals(currentOffsets, endOffsets))
{
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record ->
{
long recordSequenceNumber = Longs.fromByteArray(record.headers().lastHeader(HEADER).value());
- LOG.debug("Found watermark partition[{}]={}", record.partition(), recordSequenceNumber);
- watermarks.set(record.partition(), recordSequenceNumber);
+ LOG.debug(
+ "Found message #{} on offset {} of partition {}",
+ recordSequenceNumber,
+ record.offset(),
+ record.partition());
+ send.add(recordSequenceNumber);
currentOffsets[record.partition()] = record.offset();
});
+ deleted += cleanUp();
LOG.debug("Current offsets: {}", currentOffsets);
}
- LOG.info("Found watermarks: {}", watermarks);
-
- sequenceNumber = watermarks.getLowest();
- LOG.info("Restored sequence-number: {}", sequenceNumber);
+ LOG.info("Cleaned up {} already send entries from outbox table", deleted);
consumer.close();
this.clock = clock;
this.cleanupInterval = properties.cleanupInterval;
- this.nextCleanup = LocalTime.now(clock);
+ this.nextCleanup = LocalTime.now(clock).plus(this.cleanupInterval);
}
@Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
send(item);
if (nextCleanup.isBefore(LocalTime.now(clock)))
{
- int deleted = repository.delete(watermarks.getLowest());
+ cleanUp();
nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
- LOG.info(
- "Cleaned up {} entries from outbox, next clean-up: {}",
- deleted,
- nextCleanup);
+ LOG.debug("Next clean-up: {}", nextCleanup);
}
}
while (items.size() > 0);
}
+ int cleanUp()
+ {
+ int deleted = repository.delete(send);
+ LOG.debug("Cleaned up {}/{} entries from outbox", deleted, send.size());
+ send.clear();
+ return deleted;
+ }
+
void send(OutboxItem item)
{
final ProducerRecord<String, String> record =
{
if (metadata != null)
{
- watermarks.set(metadata.partition(), item.getSequenceNumber());
+ send.add(item.getSequenceNumber());
LOG.info(
"{}/{}:{} - {}:{}={}",
metadata.topic(),