Enabled idempotence for the producer
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / OutboxProducer.java
1 package de.juplo.kafka.outbox.delivery;
2
3 import com.google.common.primitives.Longs;
4 import org.apache.kafka.common.serialization.StringSerializer;
5
6 import java.time.Clock;
7 import java.time.Duration;
8 import java.time.LocalTime;
9 import java.util.List;
10 import java.util.Properties;
11
12 import org.apache.kafka.clients.producer.KafkaProducer;
13 import org.apache.kafka.clients.producer.ProducerRecord;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16 import org.springframework.scheduling.annotation.Scheduled;
17
18 import javax.annotation.PreDestroy;
19
20 import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
21 import static org.apache.kafka.clients.producer.ProducerConfig.*;
22
23
24 public class OutboxProducer
25 {
26   final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
27
28
29   private final OutboxRepository repository;
30   private final KafkaProducer<String, String> producer;
31   private final String topic;
32   private final Watermarks watermarks;
33   private final Clock clock;
34   private final Duration cleanupInterval;
35
36   private long sequenceNumber = 0l;
37   private LocalTime nextCleanup;
38
39   public OutboxProducer(
40       ApplicationProperties properties,
41       OutboxRepository repository,
42       Clock clock)
43   {
44     this.repository = repository;
45
46     Properties props = new Properties();
47     props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
48     props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
49     props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
50     props.put(ENABLE_IDEMPOTENCE_CONFIG, true);
51
52     this.producer = new KafkaProducer<>(props);
53     this.topic = properties.topic;
54
55     this.watermarks = new Watermarks();
56     this.clock = clock;
57     this.cleanupInterval = properties.cleanupInterval;
58     this.nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
59   }
60
61   @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
62   public void poll()
63   {
64     List<OutboxItem> items;
65     do
66     {
67       items = repository.fetch(sequenceNumber);
68       LOG.debug("Polled {} new items", items.size());
69       for (OutboxItem item : items)
70         send(item);
71       if (nextCleanup.isBefore(LocalTime.now(clock)))
72       {
73         int deleted = repository.delete(watermarks.getLowest());
74         nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
75         LOG.info(
76             "Cleaned up {} entries from outbox, next clean-up: {}",
77             deleted,
78             nextCleanup);
79       }
80     }
81     while (items.size() > 0);
82   }
83
84   void send(OutboxItem item)
85   {
86     final ProducerRecord<String, String> record =
87         new ProducerRecord<>(topic, item.getKey(), item.getValue());
88
89     sequenceNumber = item.getSequenceNumber();
90     record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber));
91
92     producer.send(record, (metadata, e) ->
93     {
94       if (metadata != null)
95       {
96         watermarks.set(metadata.partition(), item.getSequenceNumber());
97         LOG.info(
98             "{}/{}:{} - {}:{}={}",
99             metadata.topic(),
100             metadata.partition(),
101             metadata.offset(),
102             item.getSequenceNumber(),
103             record.key(),
104             record.value());
105       }
106       else
107       {
108         // HANDLE ERROR
109         LOG.error(
110             "{}/{} - {}:{}={} -> ",
111             record.topic(),
112             record.partition(),
113             item.getSequenceNumber(),
114             record.key(),
115             record.value(),
116             e);
117       }
118     });
119   }
120
121
122   @PreDestroy
123   public void close()
124   {
125     producer.close(Duration.ofSeconds(5));
126   }
127 }