1 package de.juplo.kafka.outbox.delivery;
3 import com.google.common.primitives.Longs;
4 import org.apache.kafka.common.serialization.StringSerializer;
6 import java.time.Clock;
7 import java.time.Duration;
8 import java.time.LocalTime;
10 import java.util.Properties;
12 import org.apache.kafka.clients.producer.KafkaProducer;
13 import org.apache.kafka.clients.producer.ProducerRecord;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16 import org.springframework.scheduling.annotation.Scheduled;
18 import javax.annotation.PreDestroy;
21 public class OutboxProducer
23 final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
26 private final OutboxRepository repository;
27 private final KafkaProducer<String, String> producer;
28 private final String topic;
29 private final Watermarks watermarks;
30 private final Clock clock;
31 private final Duration cleanupInterval;
33 private long sequenceNumber = 0l;
34 private LocalTime nextCleanup;
36 public OutboxProducer(
37 ApplicationProperties properties,
38 OutboxRepository repository,
41 this.repository = repository;
43 Properties props = new Properties();
44 props.put("bootstrap.servers", properties.bootstrapServers);
45 props.put("key.serializer", StringSerializer.class.getName());
46 props.put("value.serializer", StringSerializer.class.getName());
48 this.producer = new KafkaProducer<>(props);
49 this.topic = properties.topic;
51 this.watermarks = new Watermarks();
53 this.cleanupInterval = properties.cleanupInterval;
54 this.nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
57 @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
60 List<OutboxItem> items;
63 items = repository.fetch(sequenceNumber);
64 LOG.debug("Polled {} new items", items.size());
65 for (OutboxItem item : items)
67 if (nextCleanup.isBefore(LocalTime.now(clock)))
69 int deleted = repository.delete(watermarks.getLowest());
70 nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
72 "Cleaned up {} entries from outbox, next clean-up: {}",
77 while (items.size() > 0);
80 void send(OutboxItem item)
82 final ProducerRecord<String, String> record =
83 new ProducerRecord<>(topic, item.getKey(), item.getValue());
85 sequenceNumber = item.getSequenceNumber();
86 record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber));
88 producer.send(record, (metadata, e) ->
92 watermarks.set(metadata.partition(), item.getSequenceNumber());
94 "{}/{}:{} - {}:{}={}",
98 item.getSequenceNumber(),
106 "{}/{} - {}:{}={} -> ",
109 item.getSequenceNumber(),
121 producer.close(Duration.ofSeconds(5));