import com.google.common.primitives.Longs;
import org.apache.kafka.common.serialization.StringSerializer;
+import java.time.Clock;
import java.time.Duration;
+import java.time.LocalTime;
import java.util.List;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
+import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.*;
+
-@Component
public class OutboxProducer
{
final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
+ public final static String HEADER = "#";
private final OutboxRepository repository;
private final KafkaProducer<String, String> producer;
private final String topic;
+ private final Watermarks watermarks;
+ private final Clock clock;
+ private final Duration cleanupInterval;
private long sequenceNumber = 0l;
+ private LocalTime nextCleanup;
public OutboxProducer(
ApplicationProperties properties,
- OutboxRepository repository)
+ OutboxRepository repository,
+ Clock clock)
{
this.repository = repository;
Properties props = new Properties();
- props.put("bootstrap.servers", properties.bootstrapServers);
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
+ props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
+ props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ENABLE_IDEMPOTENCE_CONFIG, true);
this.producer = new KafkaProducer<>(props);
this.topic = properties.topic;
+
+ this.watermarks = new Watermarks();
+ this.clock = clock;
+ this.cleanupInterval = properties.cleanupInterval;
+ this.nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
}
- @Scheduled(fixedDelay = 500)
+ @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
public void poll()
{
List<OutboxItem> items;
LOG.debug("Polled {} new items", items.size());
for (OutboxItem item : items)
send(item);
+ if (nextCleanup.isBefore(LocalTime.now(clock)))
+ {
+ int deleted = repository.delete(watermarks.getLowest());
+ nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
+ LOG.info(
+ "Cleaned up {} entries from outbox, next clean-up: {}",
+ deleted,
+ nextCleanup);
+ }
}
while (items.size() > 0);
}
new ProducerRecord<>(topic, item.getKey(), item.getValue());
sequenceNumber = item.getSequenceNumber();
- record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber));
+ record.headers().add(HEADER, Longs.toByteArray(sequenceNumber));
producer.send(record, (metadata, e) ->
{
if (metadata != null)
{
- int deleted = repository.delete(item.getSequenceNumber());
+ watermarks.set(metadata.partition(), item.getSequenceNumber());
LOG.info(
- "{}/{}:{} - {}:{}={} - deleted: {}",
+ "{}/{}:{} - {}:{}={}",
metadata.topic(),
metadata.partition(),
metadata.offset(),
item.getSequenceNumber(),
record.key(),
- record.value(),
- deleted);
+ record.value());
}
else
{