1 package de.juplo.kafka.outbox.delivery;
3 import com.google.common.primitives.Longs;
4 import org.apache.kafka.common.serialization.StringSerializer;
6 import java.time.Duration;
8 import java.util.Properties;
10 import org.apache.kafka.clients.producer.KafkaProducer;
11 import org.apache.kafka.clients.producer.ProducerRecord;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 import org.springframework.scheduling.annotation.Scheduled;
15 import org.springframework.stereotype.Component;
17 import javax.annotation.PreDestroy;
19 import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
20 import static org.apache.kafka.clients.producer.ProducerConfig.*;
24 public class OutboxProducer
26 final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
28 public final static String HEADER = "#";
30 private final OutboxRepository repository;
31 private final KafkaProducer<String, String> producer;
32 private final String topic;
34 private long sequenceNumber = 0l;
36 public OutboxProducer(
37 ApplicationProperties properties,
38 OutboxRepository repository)
40 this.repository = repository;
42 Properties props = new Properties();
43 props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
44 props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
45 props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
46 props.put(ENABLE_IDEMPOTENCE_CONFIG, true);
48 this.producer = new KafkaProducer<>(props);
49 this.topic = properties.topic;
52 @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
55 List<OutboxItem> items;
58 items = repository.fetch(sequenceNumber);
59 LOG.debug("Polled {} new items", items.size());
60 for (OutboxItem item : items)
63 while (items.size() > 0);
66 void send(OutboxItem item)
68 final ProducerRecord<String, String> record =
69 new ProducerRecord<>(topic, item.getKey(), item.getValue());
71 sequenceNumber = item.getSequenceNumber();
72 record.headers().add(HEADER, Longs.toByteArray(sequenceNumber));
74 producer.send(record, (metadata, e) ->
78 int deleted = repository.delete(item.getSequenceNumber());
80 "{}/{}:{} - {}:{}={} - deleted: {}",
84 item.getSequenceNumber(),
93 "{}/{} - {}:{}={} -> ",
96 item.getSequenceNumber(),
108 producer.close(Duration.ofSeconds(5));