ac7d4ebff2cbcd44280b469f84bb87805f0d670c
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / OutboxProducer.java
1 package de.juplo.kafka.outbox.delivery;
2
3 import com.google.common.primitives.Longs;
4 import org.apache.kafka.common.serialization.StringSerializer;
5
6 import java.time.Clock;
7 import java.time.Duration;
8 import java.time.LocalTime;
9 import java.util.List;
10 import java.util.Properties;
11
12 import org.apache.kafka.clients.producer.KafkaProducer;
13 import org.apache.kafka.clients.producer.ProducerRecord;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16 import org.springframework.scheduling.annotation.Scheduled;
17
18 import javax.annotation.PreDestroy;
19
20
21 public class OutboxProducer
22 {
23   final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
24
25
26   private final OutboxRepository repository;
27   private final KafkaProducer<String, String> producer;
28   private final String topic;
29   private final Watermarks watermarks;
30   private final Clock clock;
31   private final Duration cleanupInterval;
32
33   private long sequenceNumber = 0l;
34   private LocalTime nextCleanup;
35
36   public OutboxProducer(
37       ApplicationProperties properties,
38       OutboxRepository repository,
39       Clock clock)
40   {
41     this.repository = repository;
42
43     Properties props = new Properties();
44     props.put("bootstrap.servers", properties.bootstrapServers);
45     props.put("key.serializer", StringSerializer.class.getName());
46     props.put("value.serializer", StringSerializer.class.getName());
47
48     this.producer = new KafkaProducer<>(props);
49     this.topic = properties.topic;
50
51     this.watermarks = new Watermarks();
52     this.clock = clock;
53     this.cleanupInterval = properties.cleanupInterval;
54     this.nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
55   }
56
57   @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
58   public void poll()
59   {
60     List<OutboxItem> items;
61     do
62     {
63       items = repository.fetch(sequenceNumber);
64       LOG.debug("Polled {} new items", items.size());
65       for (OutboxItem item : items)
66         send(item);
67       if (nextCleanup.isBefore(LocalTime.now(clock)))
68       {
69         int deleted = repository.delete(watermarks.getLowest());
70         nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
71         LOG.info(
72             "Cleaned up {} entries from outbox, next clean-up: {}",
73             deleted,
74             nextCleanup);
75       }
76     }
77     while (items.size() > 0);
78   }
79
80   void send(OutboxItem item)
81   {
82     final ProducerRecord<String, String> record =
83         new ProducerRecord<>(topic, item.getKey(), item.getValue());
84
85     sequenceNumber = item.getSequenceNumber();
86     record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber));
87
88     producer.send(record, (metadata, e) ->
89     {
90       if (metadata != null)
91       {
92         watermarks.set(metadata.partition(), item.getSequenceNumber());
93         LOG.info(
94             "{}/{}:{} - {}:{}={}",
95             metadata.topic(),
96             metadata.partition(),
97             metadata.offset(),
98             item.getSequenceNumber(),
99             record.key(),
100             record.value());
101       }
102       else
103       {
104         // HANDLE ERROR
105         LOG.error(
106             "{}/{} - {}:{}={} -> ",
107             record.topic(),
108             record.partition(),
109             item.getSequenceNumber(),
110             record.key(),
111             record.value(),
112             e);
113       }
114     });
115   }
116
117
118   @PreDestroy
119   public void close()
120   {
121     producer.close(Duration.ofSeconds(5));
122   }
123 }