627ca0578fc3a41d78f90183b2669b388c5bf2c3
[demos/kafka/outbox] / outbox / src / main / java / de / juplo / kafka / outbox / OutboxProducer.java
1 package de.juplo.kafka.outbox;
2
3 import org.apache.kafka.common.serialization.StringSerializer;
4
5 import java.nio.ByteBuffer;
6 import java.time.Duration;
7 import java.util.List;
8 import java.util.Properties;
9
10 import org.apache.kafka.clients.producer.KafkaProducer;
11 import org.apache.kafka.clients.producer.ProducerRecord;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 import org.springframework.scheduling.annotation.Scheduled;
15 import org.springframework.stereotype.Component;
16
17 import javax.annotation.PreDestroy;
18
19
20 @Component
21 public class OutboxProducer
22 {
23   final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
24
25
26   private final OutboxRepository repository;
27   private final KafkaProducer<String, String> producer;
28   private final String topic;
29
30   private long sequenceNumber = 0l;
31
32   public OutboxProducer(
33       ApplicationProperties properties,
34       OutboxRepository repository)
35   {
36     this.repository = repository;
37
38     Properties props = new Properties();
39     props.put("bootstrap.servers", properties.bootstrapServers);
40     props.put("key.serializer", StringSerializer.class.getName());
41     props.put("value.serializer", StringSerializer.class.getName());
42
43     this.producer = new KafkaProducer<>(props);
44     this.topic = properties.topic;
45   }
46
47   @Scheduled(fixedDelay = 500)
48   public void poll()
49   {
50     List<OutboxItem> items;
51     do
52     {
53       items = repository.fetch(sequenceNumber);
54       LOG.debug("Polled {} new items", items.size());
55       for (OutboxItem item : items)
56         send(item);
57     }
58     while (items.size() > 0);
59   }
60
61   void send(OutboxItem item)
62   {
63     final ProducerRecord<String, String> record =
64         new ProducerRecord<>(topic, item.getKey(), item.getValue());
65
66     sequenceNumber = item.getSequenceNumber();
67     ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
68     buffer.putLong(item.getSequenceNumber());
69     record.headers().add("SEQ#", buffer.array());
70
71     producer.send(record, (metadata, e) ->
72     {
73       if (metadata != null)
74       {
75         int deleted = repository.delete(item.getSequenceNumber());
76         LOG.info(
77             "{}/{}:{} - {}:{}={} - deleted: {}",
78             metadata.topic(),
79             metadata.partition(),
80             metadata.offset(),
81             item.getSequenceNumber(),
82             record.key(),
83             record.value(),
84             deleted);
85       }
86       else
87       {
88         // HANDLE ERROR
89         LOG.error(
90             "{}/{} - {}:{}={} -> ",
91             record.topic(),
92             record.partition(),
93             item.getSequenceNumber(),
94             record.key(),
95             record.value(),
96             e);
97       }
98     });
99   }
100
101
102   @PreDestroy
103   public void close()
104   {
105     producer.close(Duration.ofSeconds(5));
106   }
107 }