Enabled idempotence for the producer
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / OutboxProducer.java
1 package de.juplo.kafka.outbox.delivery;
2
3 import com.google.common.primitives.Longs;
4 import org.apache.kafka.common.serialization.StringSerializer;
5
6 import java.time.Duration;
7 import java.util.List;
8 import java.util.Properties;
9
10 import org.apache.kafka.clients.producer.KafkaProducer;
11 import org.apache.kafka.clients.producer.ProducerRecord;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 import org.springframework.scheduling.annotation.Scheduled;
15 import org.springframework.stereotype.Component;
16
17 import javax.annotation.PreDestroy;
18
19 import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
20 import static org.apache.kafka.clients.producer.ProducerConfig.*;
21
22
23 @Component
24 public class OutboxProducer
25 {
26   final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
27
28
29   private final OutboxRepository repository;
30   private final KafkaProducer<String, String> producer;
31   private final String topic;
32
33   private long sequenceNumber = 0l;
34
35   public OutboxProducer(
36       ApplicationProperties properties,
37       OutboxRepository repository)
38   {
39     this.repository = repository;
40
41     Properties props = new Properties();
42     props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
43     props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
44     props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
45     props.put(ENABLE_IDEMPOTENCE_CONFIG, true);
46
47     this.producer = new KafkaProducer<>(props);
48     this.topic = properties.topic;
49   }
50
51   @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
52   public void poll()
53   {
54     List<OutboxItem> items;
55     do
56     {
57       items = repository.fetch(sequenceNumber);
58       LOG.debug("Polled {} new items", items.size());
59       for (OutboxItem item : items)
60         send(item);
61     }
62     while (items.size() > 0);
63   }
64
65   void send(OutboxItem item)
66   {
67     final ProducerRecord<String, String> record =
68         new ProducerRecord<>(topic, item.getKey(), item.getValue());
69
70     sequenceNumber = item.getSequenceNumber();
71     record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber));
72
73     producer.send(record, (metadata, e) ->
74     {
75       if (metadata != null)
76       {
77         int deleted = repository.delete(item.getSequenceNumber());
78         LOG.info(
79             "{}/{}:{} - {}:{}={} - deleted: {}",
80             metadata.topic(),
81             metadata.partition(),
82             metadata.offset(),
83             item.getSequenceNumber(),
84             record.key(),
85             record.value(),
86             deleted);
87       }
88       else
89       {
90         // HANDLE ERROR
91         LOG.error(
92             "{}/{} - {}:{}={} -> ",
93             record.topic(),
94             record.partition(),
95             item.getSequenceNumber(),
96             record.key(),
97             record.value(),
98             e);
99       }
100     });
101   }
102
103
104   @PreDestroy
105   public void close()
106   {
107     producer.close(Duration.ofSeconds(5));
108   }
109 }