1 package de.juplo.kafka.outbox;
3 import com.google.common.primitives.Longs;
4 import org.apache.kafka.common.serialization.StringSerializer;
6 import java.time.Duration;
8 import java.util.Properties;
10 import org.apache.kafka.clients.producer.KafkaProducer;
11 import org.apache.kafka.clients.producer.ProducerRecord;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 import org.springframework.scheduling.annotation.Scheduled;
15 import org.springframework.stereotype.Component;
17 import javax.annotation.PreDestroy;
21 public class OutboxProducer
23 final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
26 private final OutboxRepository repository;
27 private final KafkaProducer<String, String> producer;
28 private final String topic;
30 private long sequenceNumber = 0l;
32 public OutboxProducer(
33 ApplicationProperties properties,
34 OutboxRepository repository)
36 this.repository = repository;
38 Properties props = new Properties();
39 props.put("bootstrap.servers", properties.bootstrapServers);
40 props.put("key.serializer", StringSerializer.class.getName());
41 props.put("value.serializer", StringSerializer.class.getName());
43 this.producer = new KafkaProducer<>(props);
44 this.topic = properties.topic;
47 @Scheduled(fixedDelay = 500)
50 List<OutboxItem> items;
53 items = repository.fetch(sequenceNumber);
54 LOG.debug("Polled {} new items", items.size());
55 for (OutboxItem item : items)
58 while (items.size() > 0);
61 void send(OutboxItem item)
63 final ProducerRecord<String, String> record =
64 new ProducerRecord<>(topic, item.getKey(), item.getValue());
66 sequenceNumber = item.getSequenceNumber();
67 record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber));
69 producer.send(record, (metadata, e) ->
73 int deleted = repository.delete(item.getSequenceNumber());
75 "{}/{}:{} - {}:{}={} - deleted: {}",
79 item.getSequenceNumber(),
88 "{}/{} - {}:{}={} -> ",
91 item.getSequenceNumber(),
103 producer.close(Duration.ofSeconds(5));