1 package de.juplo.kafka.outbox.delivery;
3 import com.google.common.primitives.Longs;
4 import org.apache.kafka.clients.consumer.ConsumerRecords;
5 import org.apache.kafka.clients.consumer.KafkaConsumer;
6 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
7 import org.apache.kafka.common.PartitionInfo;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.serialization.StringDeserializer;
10 import org.apache.kafka.common.serialization.StringSerializer;
12 import java.time.Clock;
13 import java.time.Duration;
14 import java.time.LocalTime;
17 import org.apache.kafka.clients.producer.KafkaProducer;
18 import org.apache.kafka.clients.producer.ProducerRecord;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21 import org.springframework.scheduling.annotation.Scheduled;
23 import javax.annotation.PreDestroy;
25 import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
26 import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
27 import static org.apache.kafka.clients.producer.ProducerConfig.*;
30 public class OutboxProducer
32 final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
34 public final static String HEADER = "#";
36 private final OutboxRepository repository;
37 private final KafkaProducer<String, String> producer;
38 private final String topic;
39 private final Set<Long> send = new HashSet<>();
40 private final Clock clock;
41 private final Duration cleanupInterval;
43 private long sequenceNumber = 0l;
44 private LocalTime nextCleanup;
46 public OutboxProducer(
47 ApplicationProperties properties,
48 OutboxRepository repository,
51 this.repository = repository;
53 Properties props = new Properties();
54 props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
55 props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
56 props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
57 props.put(ENABLE_IDEMPOTENCE_CONFIG, true);
59 this.producer = new KafkaProducer<>(props);
60 this.topic = properties.topic;
62 props = new Properties();
63 props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
64 props.put(GROUP_ID_CONFIG, "outbox");
65 props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
66 props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
67 props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
69 List<PartitionInfo> partitions = consumer.listTopics().get(this.topic);
70 Set<TopicPartition> assignment = new HashSet<>();
71 for (PartitionInfo info : partitions)
73 LOG.debug("Found {}/{} (ISR: {})", info.topic(), info.partition(), info.inSyncReplicas());
74 assignment.add(new TopicPartition(info.topic(), info.partition()));
77 LOG.info("Using topic {} with {} partitions", topic, partitions);
79 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
80 consumer.assign(assignment);
82 long[] currentOffsets = new long[partitions.size()];
83 for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumer.committed(assignment).entrySet())
85 if (entry.getValue() == null)
87 LOG.debug("Found no offset for partition {}", entry.getKey());
88 currentOffsets[entry.getKey().partition()] = -1l;
92 LOG.debug("Found current offset {} for partition {}", entry.getValue(), entry.getKey());
93 currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l;
96 LOG.info("Current offsets: {}", currentOffsets);
98 long[] endOffsets = new long[partitions.size()];
99 for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(assignment).entrySet())
101 LOG.debug("Found next offset {} for partition {}", entry.getValue(), entry.getKey());
102 endOffsets[entry.getKey().partition()] = entry.getValue() - 1l;
104 LOG.info("End-offsets: {}", endOffsets);
107 while(!Arrays.equals(currentOffsets, endOffsets))
109 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
110 LOG.debug("Fetched {} records", records.count());
111 records.forEach(record ->
113 long recordSequenceNumber = Longs.fromByteArray(record.headers().lastHeader(HEADER).value());
115 "Found message #{} on offset {} of partition {}",
116 recordSequenceNumber,
119 send.add(recordSequenceNumber);
120 currentOffsets[record.partition()] = record.offset();
122 deleted += cleanUp();
123 LOG.debug("Current offsets: {}", currentOffsets);
126 LOG.info("Cleaned up {} already send entries from outbox table", deleted);
131 this.cleanupInterval = properties.cleanupInterval;
132 this.nextCleanup = LocalTime.now(clock).plus(this.cleanupInterval);
135 @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
138 List<OutboxItem> items;
141 items = repository.fetch(sequenceNumber);
142 LOG.debug("Polled {} new items", items.size());
143 for (OutboxItem item : items)
145 if (nextCleanup.isBefore(LocalTime.now(clock)))
148 nextCleanup = LocalTime.now(clock).plus(cleanupInterval);
149 LOG.debug("Next clean-up: {}", nextCleanup);
152 while (items.size() > 0);
157 int deleted = repository.delete(send);
158 LOG.debug("Cleaned up {}/{} entries from outbox", deleted, send.size());
163 void send(OutboxItem item)
165 final ProducerRecord<String, String> record =
166 new ProducerRecord<>(topic, item.getKey(), item.getValue());
168 sequenceNumber = item.getSequenceNumber();
169 record.headers().add(HEADER, Longs.toByteArray(sequenceNumber));
171 producer.send(record, (metadata, e) ->
173 if (metadata != null)
175 send.add(item.getSequenceNumber());
177 "{}/{}:{} - {}:{}={}",
179 metadata.partition(),
181 item.getSequenceNumber(),
189 "{}/{} - {}:{}={} -> ",
192 item.getSequenceNumber(),
204 producer.close(Duration.ofSeconds(5));