Moved the name of the header for the sequence-number into a static field
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / OutboxProducer.java
1 package de.juplo.kafka.outbox.delivery;
2
3 import com.google.common.primitives.Longs;
4 import org.apache.kafka.common.serialization.StringSerializer;
5
6 import java.time.Duration;
7 import java.util.List;
8 import java.util.Properties;
9
10 import org.apache.kafka.clients.producer.KafkaProducer;
11 import org.apache.kafka.clients.producer.ProducerRecord;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 import org.springframework.scheduling.annotation.Scheduled;
15 import org.springframework.stereotype.Component;
16
17 import javax.annotation.PreDestroy;
18
19 import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
20 import static org.apache.kafka.clients.producer.ProducerConfig.*;
21
22
23 @Component
24 public class OutboxProducer
25 {
26   final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
27
28   public final static String HEADER = "#";
29
30   private final OutboxRepository repository;
31   private final KafkaProducer<String, String> producer;
32   private final String topic;
33
34   private long sequenceNumber = 0l;
35
36   public OutboxProducer(
37       ApplicationProperties properties,
38       OutboxRepository repository)
39   {
40     this.repository = repository;
41
42     Properties props = new Properties();
43     props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
44     props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
45     props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
46     props.put(ENABLE_IDEMPOTENCE_CONFIG, true);
47
48     this.producer = new KafkaProducer<>(props);
49     this.topic = properties.topic;
50   }
51
52   @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
53   public void poll()
54   {
55     List<OutboxItem> items;
56     do
57     {
58       items = repository.fetch(sequenceNumber);
59       LOG.debug("Polled {} new items", items.size());
60       for (OutboxItem item : items)
61         send(item);
62     }
63     while (items.size() > 0);
64   }
65
66   void send(OutboxItem item)
67   {
68     final ProducerRecord<String, String> record =
69         new ProducerRecord<>(topic, item.getKey(), item.getValue());
70
71     sequenceNumber = item.getSequenceNumber();
72     record.headers().add(HEADER, Longs.toByteArray(sequenceNumber));
73
74     producer.send(record, (metadata, e) ->
75     {
76       if (metadata != null)
77       {
78         int deleted = repository.delete(item.getSequenceNumber());
79         LOG.info(
80             "{}/{}:{} - {}:{}={} - deleted: {}",
81             metadata.topic(),
82             metadata.partition(),
83             metadata.offset(),
84             item.getSequenceNumber(),
85             record.key(),
86             record.value(),
87             deleted);
88       }
89       else
90       {
91         // HANDLE ERROR
92         LOG.error(
93             "{}/{} - {}:{}={} -> ",
94             record.topic(),
95             record.partition(),
96             item.getSequenceNumber(),
97             record.key(),
98             record.value(),
99             e);
100       }
101     });
102   }
103
104
105   @PreDestroy
106   public void close()
107   {
108     producer.close(Duration.ofSeconds(5));
109   }
110 }