Fixed bug on first start-up (no current offsets)
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / OutboxProducer.java
1 package de.juplo.kafka.outbox.delivery;
2
3 import com.google.common.primitives.Longs;
4 import org.apache.kafka.clients.consumer.ConsumerRecords;
5 import org.apache.kafka.clients.consumer.KafkaConsumer;
6 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
7 import org.apache.kafka.common.PartitionInfo;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.serialization.StringDeserializer;
10 import org.apache.kafka.common.serialization.StringSerializer;
11
12 import java.time.Clock;
13 import java.time.Duration;
14 import java.time.LocalTime;
15 import java.util.*;
16
17 import org.apache.kafka.clients.producer.KafkaProducer;
18 import org.apache.kafka.clients.producer.ProducerRecord;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21 import org.springframework.scheduling.annotation.Scheduled;
22
23 import javax.annotation.PreDestroy;
24
25 import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
26 import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
27 import static org.apache.kafka.clients.producer.ProducerConfig.*;
28
29
30 public class OutboxProducer
31 {
32   final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
33
34   public final static String HEADER = "#";
35
36   private final OutboxRepository repository;
37   private final KafkaProducer<String, String> producer;
38   private final String topic;
39   private final Set<Long> send = new HashSet<>();
40   private final Clock clock;
41   private final Duration cleanupInterval;
42
43   private long sequenceNumber = 0l;
44   private LocalTime nextCleanup;
45
46   public OutboxProducer(
47       ApplicationProperties properties,
48       OutboxRepository repository,
49       Clock clock)
50   {
51     this.repository = repository;
52
53     Properties props = new Properties();
54     props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
55     props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
56     props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
57     props.put(ENABLE_IDEMPOTENCE_CONFIG, true);
58
59     this.producer = new KafkaProducer<>(props);
60     this.topic = properties.topic;
61
62     props = new Properties();
63     props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
64     props.put(GROUP_ID_CONFIG, "outbox");
65     props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
66     props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
67     props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
68
69     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
70     consumer.subscribe(Arrays.asList(this.topic));
71     List<PartitionInfo> partitions = consumer.listTopics().get(this.topic);
72     Set<TopicPartition> assignment = new HashSet<>();
73     for (PartitionInfo info : partitions)
74     {
75       LOG.debug("Found {}/{} (ISR: {})", info.topic(), info.partition(), info.inSyncReplicas());
76       assignment.add(new TopicPartition(info.topic(), info.partition()));
77     }
78
79     LOG.info("Using topic {} with {} partitions", topic, partitions);
80
81     long[] currentOffsets = new long[partitions.size()];
82     for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumer.committed(assignment).entrySet())
83     {
84       if (entry.getValue() == null)
85       {
86         LOG.debug("Found no offset for partition {}", entry.getKey());
87         currentOffsets[entry.getKey().partition()] = -1l;
88       }
89       else
90       {
91         LOG.debug("Found current offset {} for partition {}", entry.getValue(), entry.getKey());
92         currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l;
93       }
94     }
95     LOG.info("Current offsets: {}", currentOffsets);
96
97     long[] endOffsets = new long[partitions.size()];
98     for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(assignment).entrySet())
99     {
100       LOG.debug("Found next offset {} for partition {}", entry.getValue(), entry.getKey());
101       endOffsets[entry.getKey().partition()] = entry.getValue() - 1l;
102     }
103     LOG.info("End-offsets: {}", endOffsets);
104
105     int deleted = 0;
106     while(!Arrays.equals(currentOffsets, endOffsets))
107     {
108       ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
109       LOG.debug("Fetched {} records", records.count());
110       records.forEach(record ->
111       {
112         long recordSequenceNumber = Longs.fromByteArray(record.headers().lastHeader(HEADER).value());
113         LOG.debug(
114             "Found message #{} on offset {} of partition {}",
115             recordSequenceNumber,
116             record.offset(),
117             record.partition());
118         send.add(recordSequenceNumber);
119         currentOffsets[record.partition()] = record.offset();
120       });
121       deleted += cleanUp();
122       LOG.debug("Current offsets: {}", currentOffsets);
123     }
124
125     LOG.info("Cleaned up {} already send entries from outbox table", deleted);
126
127     consumer.close();
128
129     this.clock = clock;
130     this.cleanupInterval = properties.cleanupInterval;
131     this.nextCleanup = LocalTime.now(clock).plus(this.cleanupInterval);
132   }
133
134   @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
135   public void poll()
136   {
137     List<OutboxItem> items;
138     do
139     {
140       items = repository.fetch(sequenceNumber);
141       LOG.debug("Polled {} new items", items.size());
142       for (OutboxItem item : items)
143         send(item);
144       if (nextCleanup.isBefore(LocalTime.now(clock)))
145       {
146         cleanUp();
147         nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
148         LOG.debug("Next clean-up: {}", nextCleanup);
149       }
150     }
151     while (items.size() > 0);
152   }
153
154   int cleanUp()
155   {
156     int deleted = repository.delete(send);
157     LOG.debug("Cleaned up {}/{} entries from outbox", deleted, send.size());
158     send.clear();
159     return deleted;
160   }
161
162   void send(OutboxItem item)
163   {
164     final ProducerRecord<String, String> record =
165         new ProducerRecord<>(topic, item.getKey(), item.getValue());
166
167     sequenceNumber = item.getSequenceNumber();
168     record.headers().add(HEADER, Longs.toByteArray(sequenceNumber));
169
170     producer.send(record, (metadata, e) ->
171     {
172       if (metadata != null)
173       {
174         send.add(item.getSequenceNumber());
175         LOG.info(
176             "{}/{}:{} - {}:{}={}",
177             metadata.topic(),
178             metadata.partition(),
179             metadata.offset(),
180             item.getSequenceNumber(),
181             record.key(),
182             record.value());
183       }
184       else
185       {
186         // HANDLE ERROR
187         LOG.error(
188             "{}/{} - {}:{}={} -> ",
189             record.topic(),
190             record.partition(),
191             item.getSequenceNumber(),
192             record.key(),
193             record.value(),
194             e);
195       }
196     });
197   }
198
199
200   @PreDestroy
201   public void close()
202   {
203     producer.close(Duration.ofSeconds(5));
204   }
205 }