Old entries are removed from the outbox-table in batches
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / OutboxProducer.java
1 package de.juplo.kafka.outbox.delivery;
2
3 import com.google.common.primitives.Longs;
4 import org.apache.kafka.common.serialization.StringSerializer;
5
6 import java.time.Clock;
7 import java.time.Duration;
8 import java.time.LocalTime;
9 import java.util.List;
10 import java.util.Properties;
11
12 import org.apache.kafka.clients.producer.KafkaProducer;
13 import org.apache.kafka.clients.producer.ProducerRecord;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16 import org.springframework.scheduling.annotation.Scheduled;
17
18 import javax.annotation.PreDestroy;
19
20 import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
21 import static org.apache.kafka.clients.producer.ProducerConfig.*;
22
23
24 public class OutboxProducer
25 {
26   final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
27
28   public final static String HEADER = "#";
29
30   private final OutboxRepository repository;
31   private final KafkaProducer<String, String> producer;
32   private final String topic;
33   private final Watermarks watermarks;
34   private final Clock clock;
35   private final Duration cleanupInterval;
36
37   private long sequenceNumber = 0l;
38   private LocalTime nextCleanup;
39
40   public OutboxProducer(
41       ApplicationProperties properties,
42       OutboxRepository repository,
43       Clock clock)
44   {
45     this.repository = repository;
46
47     Properties props = new Properties();
48     props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
49     props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
50     props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
51     props.put(ENABLE_IDEMPOTENCE_CONFIG, true);
52
53     this.producer = new KafkaProducer<>(props);
54     this.topic = properties.topic;
55
56     this.watermarks = new Watermarks();
57     this.clock = clock;
58     this.cleanupInterval = properties.cleanupInterval;
59     this.nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
60   }
61
62   @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
63   public void poll()
64   {
65     List<OutboxItem> items;
66     do
67     {
68       items = repository.fetch(sequenceNumber);
69       LOG.debug("Polled {} new items", items.size());
70       for (OutboxItem item : items)
71         send(item);
72       if (nextCleanup.isBefore(LocalTime.now(clock)))
73       {
74         int deleted = repository.delete(watermarks.getLowest());
75         nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
76         LOG.info(
77             "Cleaned up {} entries from outbox, next clean-up: {}",
78             deleted,
79             nextCleanup);
80       }
81     }
82     while (items.size() > 0);
83   }
84
85   void send(OutboxItem item)
86   {
87     final ProducerRecord<String, String> record =
88         new ProducerRecord<>(topic, item.getKey(), item.getValue());
89
90     sequenceNumber = item.getSequenceNumber();
91     record.headers().add(HEADER, Longs.toByteArray(sequenceNumber));
92
93     producer.send(record, (metadata, e) ->
94     {
95       if (metadata != null)
96       {
97         watermarks.set(metadata.partition(), item.getSequenceNumber());
98         LOG.info(
99             "{}/{}:{} - {}:{}={}",
100             metadata.topic(),
101             metadata.partition(),
102             metadata.offset(),
103             item.getSequenceNumber(),
104             record.key(),
105             record.value());
106       }
107       else
108       {
109         // HANDLE ERROR
110         LOG.error(
111             "{}/{} - {}:{}={} -> ",
112             record.topic(),
113             record.partition(),
114             item.getSequenceNumber(),
115             record.key(),
116             record.value(),
117             e);
118       }
119     });
120   }
121
122
123   @PreDestroy
124   public void close()
125   {
126     producer.close(Duration.ofSeconds(5));
127   }
128 }