Moved postage of messages into a reusable standalone implementation
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / OutboxProducer.java
1 package de.juplo.kafka.outbox.delivery;
2
3 import com.google.common.primitives.Longs;
4 import org.apache.kafka.common.serialization.StringSerializer;
5
6 import java.time.Duration;
7 import java.util.List;
8 import java.util.Properties;
9
10 import org.apache.kafka.clients.producer.KafkaProducer;
11 import org.apache.kafka.clients.producer.ProducerRecord;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 import org.springframework.scheduling.annotation.Scheduled;
15 import org.springframework.stereotype.Component;
16
17 import javax.annotation.PreDestroy;
18
19
20 @Component
21 public class OutboxProducer
22 {
23   final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
24
25
26   private final OutboxRepository repository;
27   private final KafkaProducer<String, String> producer;
28   private final String topic;
29
30   private long sequenceNumber = 0l;
31
32   public OutboxProducer(
33       ApplicationProperties properties,
34       OutboxRepository repository)
35   {
36     this.repository = repository;
37
38     Properties props = new Properties();
39     props.put("bootstrap.servers", properties.bootstrapServers);
40     props.put("key.serializer", StringSerializer.class.getName());
41     props.put("value.serializer", StringSerializer.class.getName());
42
43     this.producer = new KafkaProducer<>(props);
44     this.topic = properties.topic;
45   }
46
47   @Scheduled(fixedDelay = 500)
48   public void poll()
49   {
50     List<OutboxItem> items;
51     do
52     {
53       items = repository.fetch(sequenceNumber);
54       LOG.debug("Polled {} new items", items.size());
55       for (OutboxItem item : items)
56         send(item);
57     }
58     while (items.size() > 0);
59   }
60
61   void send(OutboxItem item)
62   {
63     final ProducerRecord<String, String> record =
64         new ProducerRecord<>(topic, item.getKey(), item.getValue());
65
66     sequenceNumber = item.getSequenceNumber();
67     record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber));
68
69     producer.send(record, (metadata, e) ->
70     {
71       if (metadata != null)
72       {
73         int deleted = repository.delete(item.getSequenceNumber());
74         LOG.info(
75             "{}/{}:{} - {}:{}={} - deleted: {}",
76             metadata.topic(),
77             metadata.partition(),
78             metadata.offset(),
79             item.getSequenceNumber(),
80             record.key(),
81             record.value(),
82             deleted);
83       }
84       else
85       {
86         // HANDLE ERROR
87         LOG.error(
88             "{}/{} - {}:{}={} -> ",
89             record.topic(),
90             record.partition(),
91             item.getSequenceNumber(),
92             record.key(),
93             record.value(),
94             e);
95       }
96     });
97   }
98
99
100   @PreDestroy
101   public void close()
102   {
103     producer.close(Duration.ofSeconds(5));
104   }
105 }